2017-10-09 212 views
0

我想控制SparkDDD中每个阶段正在处理的并发分区的数量。 .repartition(...)不是解决方案,因为它只是修改一个阶段的分区总数,而不是正在处理的数量。Apache Spark - 限制每个阶段的并发线程数

通常情况下,您通过使用--executor-cores--num-executors参数来限制并发分区的数量。这是不准确的,因为处理阶段可以交错等

我想要完成的主要事情是从具有某些资源限制(并发连接)的数据库的dataload进程 - 但我不希望这些数据库资源限制来决定我的其他火花进程或RDD的并发性。我也不想在流程开始时强制非常大的分区,这些分区必须进一步拆分和重新分配。

这似乎是一个合理的预期,但乍一看不是可以在Spark API内完成的事情。

实施例(一些伪代码)

rdd = pseudoReadFromJDBC(partitions = 500,parallelism=10) 
    .repartition(100) 
    .parallelism(50) 
    .operatorOnRDD(); 

因此,在这种情况下,在第一阶段,我会分裂成500个较小的数据集从JDBC查询检索的数据。但是,我将Spark限制为仅允许同时运行10个线程,因此我最多只能同时打开10个JDBC连接。其他分区只会排队。

然后在第二阶段,我可能会重新分区,但更重要的是,我想选择更高程度的实际并行性,因为我不再限制数据库允许有限数量的同时连接。

这就是我所说的每个阶段改变它的意思。

+0

我的猜测:这是“输入格式”控制的东西,而不是火花。例如。 JDBC连接器有一个numPartitions设置,允许设置并发连接的数量(假设有足够的执行者可以访问它)。 elasticsearch输入格式将匹配碎片的数量。实木复合地板输入格式将匹配输入文件的数量......分割的数量是火花给出的东西,而不是它所作用的东西。 (我的知识水平在这个问题上是有限的,我不完全相信这个问题可以解决你的问题)。 – GPI

+0

InputFormat和numParttitions等会影响分片或分区的数量 - 它并不指定并发性。例如,我可以设置1000个分区,但可能只有100个分区将并行处理。 Parallelism并非仅仅由执行者的数量决定,而是由执行者核心的数量*执行者的数量决定的。这只是它运行的环境,但我想在执行过程中进一步缩小范围。 – YoYo

回答

0

有一个参数spark.default.parallelism。你可以尝试改变这个值。

+0

与我列出的其他两个参数一样,它又影响整个Spark提交,而不仅仅是RDD中的一个阶段。此外,它只影响分区的总数,而不是实际的并行性(与名称相反)。 – YoYo