2

我有一个Spark任务,我需要在每个微批处理中写入SQL查询的输出。写入操作是一项昂贵的操作,导致批处理执行时间超过批处理间隔。如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?

我正在寻找提高写入性能的方法。

  1. 正在单独的线程中执行写操作,如下面的一个好选项所示?

  2. 这是否会导致任何副作用,因为Spark本身以分布式方式执行?

  3. 是否有其他更好的方法来加速写入?

    // Create a fixed thread pool to execute asynchronous tasks 
    val executorService = Executors.newFixedThreadPool(2) 
    dstream.foreachRDD { rdd => 
        import org.apache.spark.sql._ 
        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
        import spark.implicits._ 
        import spark.sql 
    
        val records = rdd.toDF("record") 
        records.createOrReplaceTempView("records") 
        val result = spark.sql("select * from records") 
    
        // Submit a asynchronous task to write 
        executorService.submit { 
        new Runnable { 
         override def run(): Unit = { 
         result.write.parquet(output) 
         } 
        } 
        } 
    } 
    

回答

3

1 - 在一个单独的线程中异步执行写操作,如下面的一个好选项所示?

号了解这里的问题关键是要问“是谁做的写”。写入是通过为集群中的执行程序分配的作业资源完成的。将写入命令放置在异步线程池上就像是将新的办公室管理器添加到具有固定员工的办公室。考虑到他们必须共享相同的员工,两名管理人员能够比单独工作做更多的工作吗?那么,一个合理的答案是“只有当第一位经理没有给他们足够的工作,所以有一些免费的能力”。

回到我们的集群,我们正在处理的是在IO重写操作。并行写作业将导致争用IO资源,从而使每个独立作业更长。最初,我们的工作看起来可能比“单一经理版”更好,但麻烦最终会打击我们。 我制作了一张图表,试图说明这是如何工作的。请注意,并行作业将花费比例更长的时间,以使它们在时间线上并发。

sequential vs parallel jobs in Spark Streaming

一旦我们达到这个地步的工作开始得到延迟,我们有一个稳定的工作,最终将失败。

2-这会引起任何的副作用,因为火花本身以分布式的方式执行?

我能想到的一些影响:

  • 可能更高集群的负载和IO争。
  • 作业在线程池队列上而不是在Spark Streaming Queue上排队。我们放弃了通过Spark UI和监视API监视我们的工作的能力,因为从Spark Streaming的角度来看,延迟是“隐藏的”并且一切正常。

3-是否有其他更好的方法来加速写入? (从廉价责令贵)

  • 如果要追加到拼花文件,通常会创建一个新的文件。追加随时间而变得昂贵。
  • 增加批处理间隔或使用窗口操作来编写更大的Parquet块。实木复合地板喜欢大文件
  • 调分区和数据的分配比例=>确保星火可以做平行
  • 提高群集资源的写,如果有必要增加更多的节点
  • 使用速度更快的存储
+0

感谢您的详细解释!这绝对有帮助。将尝试改进性能的替代选项。 – vijay

+0

@vijay你的问题是回答?考虑接受答案关闭它。 – maasg

1

是做在一个单独的线程的写入操作异步像下面所示的一个很好的选择?

是的。在优化昂贵的查询并将结果保存到外部数据存储时,这当然是需要考虑的事情。

这是否会导致任何副作用,因为Spark本身以分布式方式执行?

不这么认为。 SparkContext是线程安全的,并促进这种查询执行。

是否有其他的/更好的方法来加快写入?

是的!这是了解何时使用其他(上述)选项的关键。默认情况下,Spark应用程序以FIFO调度模式运行。

报价Scheduling Within an Application

默认情况下,星火的调度运行工作在FIFO方式。每个工作分为“阶段”(例如地图和缩小阶段),第一份工作优先考虑所有可用资源,其阶段有任务启动,然后第二份工作优先等等。如果工作在队列不需要使用整个集群,以后的作业可以立即开始运行,但是如果队列头部的作业很大,则后面的作业可能会显着延迟。

从Spark 0.8开始,也可以在作业之间配置公平共享。在公平分享下,Spark以“循环”方式在作业之间分配任务,以便所有作业获得大致相等的群集资源份额。这意味着在长时间工作时提交的短工可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长时间的工作。该模式最适合多用户设置。

这意味着,做出一个房间异步执行多次写入和并行你应该配置你的星火应用程序使用公平调度模式(使用spark.scheduler.mode属性)。

您必须配置所谓的Fair Scheduler Pools以将执行程序资源(CPU和内存)“划分”到可使用spark.scheduler.pool属性分配给作业的池中。

报价Fair Scheduler Pools

没有任何干预,新提交的工作进入一个默认池,但乔布斯的池可以在多数民众赞成提交他们的线程加入spark.scheduler.pool‘本地属性’的SparkContext设置。

+0

谢谢,会试试这个。 – vijay

相关问题