2016-12-05 69 views
1

我有一个关于火花的非常基本的问题。我通常使用50个核心运行火花作业。在查看工作进度的同时,大多数时候它显示50个并行运行的进程(按照它应该这样做),但有时它只显示2个或4个并行运行的进程。就像这样:火花核心与任务并发

[Stage 8:================================>      (297 + 2)/500] 

的RDD的正在处理的是repartitioned 100多个分区。所以这不应该是一个问题。

虽然我有一个意见。我已经看到了大多数情况发生的模式,SparkUI中的数据局部性显示为NODE_LOCAL,而其他时候所有50个进程都在运行,其中一些进程显示为RACK_LOCAL。 这让我怀疑,也许会发生这种情况,因为在同一节点中处理数据之前会缓存数据以避免网络开销,并且这会降低进一步的处理速度。

如果是这样的话,什么是避免它的方式。如果情况并非如此,那么这里发生了什么?

回答

1

一个星期后,以上这个问题挣扎,我想我已经找到是什么原因导致的问题。

如果您正在使用同样的问题挣扎,好点开始将检查星火实例配置的罚款。关于它有一个伟大的cloudera blog post

但是,如果问题不在于配置(这一点与我的情况),那么问题是某处你的代码中。问题在于,有时由于不同的原因(偏斜的连接,数据源中不均匀的分区等),您正在使用的RDD在2-3个分区上获取大量数据,其余分区的数据很少。

为了降低整个网络的数据混洗,火花尝试,每个执行器处理该数据在该节点上本地驻留。因此,2-3位执行者正在工作很长一段时间,而其他执行者只需在几毫秒内完成数据。这就是为什么我遇到了我在上述问题中描述的问题。

调试这个问题的办法是首先的检查RDD的分区大小。如果一个或几个分区与其他分区相比非常大,那么下一步就是在大分区中查找记录,这样您就可以知道,特别是在发生偏斜连接的情况下,哪个键会发生偏斜。我已经写了一个小功能调试此:

from itertools import islice 
def check_skewness(df): 
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing 
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect() 
    max_part = max(l,key=lambda item:item[1]) 
    min_part = min(l,key=lambda item:item[1]) 
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times 
     print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n' 
     print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)  if i == max_part[0] else []).take(5)) 
    else: 
     print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part 

它给我的最小和最大分区的大小,如果这两者之间的差值超过5倍,它打印的最大分区的5个元素,应该给你一个大概的想法。

一旦你发现问题是偏斜的分区,你可以找到一种方法来摆脱偏斜的关键,或者你可以重新分区你的数据框,这将迫使它得到平均分配,现在看到所有的执行者都会在相同的时间内工作,并且你会看到更少的可怕OOM错误,并且处理速度也会非常快。

这些仅仅是我作为Spark新手的两分钱,我希望Spark专家可以在这个问题上增加一些内容,因为我认为Spark世界中的很多新手常常面临类似的问题。