我有一个Spark任务,我需要在每个微批处理中写入SQL查询的输出。写入操作是一项昂贵的操作,导致批处理执行时间超过批处理间隔。如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?
我正在寻找提高写入性能的方法。
正在单独的线程中执行写操作,如下面的一个好选项所示?
这是否会导致任何副作用,因为Spark本身以分布式方式执行?
是否有其他更好的方法来加速写入?
// Create a fixed thread pool to execute asynchronous tasks val executorService = Executors.newFixedThreadPool(2) dstream.foreachRDD { rdd => import org.apache.spark.sql._ val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate import spark.implicits._ import spark.sql val records = rdd.toDF("record") records.createOrReplaceTempView("records") val result = spark.sql("select * from records") // Submit a asynchronous task to write executorService.submit { new Runnable { override def run(): Unit = { result.write.parquet(output) } } } }
感谢您的详细解释!这绝对有帮助。将尝试改进性能的替代选项。 – vijay
@vijay你的问题是回答?考虑接受答案关闭它。 – maasg