2017-08-17 122 views
1

我们正在运行以下阶段DAG和经历较长洗牌阅读时间相对较小洗牌的数据尺寸(约每任务19MB)星火洗牌读需要显著时间小数据

enter image description here

一个有趣的方面是每个执行器/服务器中的等待任务具有相同的洗牌读取时间。下面是一个例子:对于下面的服务器,一组任务等待约7.7分钟,另一组等待约26秒。

enter image description here

下面是从同一阶段运行的另一个例子。该图显示了3个执行者/服务器,每个具有相同的洗牌读取时间的统一的任务组。蓝组表示,由于推测执行打死任务:

enter image description here

并非所有的执行者都是这样的。有些人几乎可以在几秒内完成所有任务,而且这些任务的远程读取数据的大小与在其他服务器上等待很长时间的大小相同。 此外,这种类型的阶段在我们的应用程序运行时间内运行两次。产生这些具有较大洗牌读取时间的任务组的服务器/执行者在每个阶段运行中都是不同的。

下面是西弗斯/ hosts中的一个任务统计数据表的例子:

enter image description here

看起来负责该DAG的代码如下:

output.write.parquet("output.parquet") 
comparison.write.parquet("comparison.parquet") 
output.union(comparison).write.parquet("output_comparison.parquet") 
val comparison = data.union(output).except(data.intersect(output)).cache() 
comparison.filter(_.abc != "M").count() 

我们将高度赞赏你的想法。

+1

奇怪。代码和数据样本将不胜感激。我看到DAG的每一步都有缓存调用,你缓存了一切吗? – Garren

+0

你好。谢谢你的问题。我在上面的描述中发布了代码。我们只在我们认为需要时才缓存。 – Dimon

+0

除了和相交的呼叫是我的担心。您的DAG引用了sortmergejoin;你知道哪条线路造成了麻烦吗? – Garren

回答

0

显然问题是JVM垃圾回收(GC)。任务必须等到GC在远程执行器上完成。相同的洗牌读取时间是由于几个任务正在等待执行GC的单个远程主机的事实。我们按照提示here,问题减少了一个数量级。远程主机上的GC时间与本地shuffle读取时间之间的关联仍然很小。将来我们会考虑尝试洗牌服务。