一个星期后,以上这个问题挣扎,我想我已经找到是什么原因导致的问题。
如果您正在使用同样的问题挣扎,好点开始将检查星火实例配置的罚款。关于它有一个伟大的cloudera blog post。
但是,如果问题不在于配置(这一点与我的情况),那么问题是某处你的代码中。问题在于,有时由于不同的原因(偏斜的连接,数据源中不均匀的分区等),您正在使用的RDD在2-3个分区上获取大量数据,其余分区的数据很少。
为了降低整个网络的数据混洗,火花尝试,每个执行器处理该数据在该节点上本地驻留。因此,2-3位执行者正在工作很长一段时间,而其他执行者只需在几毫秒内完成数据。这就是为什么我遇到了我在上述问题中描述的问题。
调试这个问题的办法是首先的检查RDD的分区大小。如果一个或几个分区与其他分区相比非常大,那么下一步就是在大分区中查找记录,这样您就可以知道,特别是在发生偏斜连接的情况下,哪个键会发生偏斜。我已经写了一个小功能调试此:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
它给我的最小和最大分区的大小,如果这两者之间的差值超过5倍,它打印的最大分区的5个元素,应该给你一个大概的想法。
一旦你发现问题是偏斜的分区,你可以找到一种方法来摆脱偏斜的关键,或者你可以重新分区你的数据框,这将迫使它得到平均分配,现在看到所有的执行者都会在相同的时间内工作,并且你会看到更少的可怕OOM错误,并且处理速度也会非常快。
这些仅仅是我作为Spark新手的两分钱,我希望Spark专家可以在这个问题上增加一些内容,因为我认为Spark世界中的很多新手常常面临类似的问题。