2017-08-17 84 views
0

我正在与的RDD一起工作,它叫做file工人之间的平衡RDD分区 - Spark

#values: RDD of tuples (key, val)  
file = values.groupByKey().mapValues(set).cache() 
info_file = array(file.map(lambda (x,y): len(y)).collect()) 
var = np.var(info_file) #extremely high 
def f(): 
    ... 
file.foreachPartition(f) 

len(y)方差是非常高的,从而使得对集合的约1%(认证用百分方法),使值的集合total = np.sum(info_file)总数的20%。 如果Spark随机随机分配,那么1%的机会很可能落在同一个分区中,从而导致工作人员之间的负载不平衡。

有没有一种方法可以确保“重”元组在分区间分配? 我实际上将file分为两个分区,heavylight,基于的阈值threshold = np.percentile(info_file,99.9)给出,以便分离这组元组,然后重新分区。

light = file.filter(lambda (x,y): len(y) < threshold).cache() 
heavy = file.filter(lambda (x,y): len(y) >= threshold).cache() 

light.foreachPartition(f) 
heavy.foreachPartition(f) 

但获得几乎相同的运行时间。负载可能已经优化,只是想检查我是否可以做更多/更好的事情。

回答

1

您可以使用Ganglia监视群集负载。这应该能够很好地说明可能导致群集负载不均匀的任何数据偏差。

如果您确实有不幸的数据歪斜,可以通过重组数据或抠像键等方式对其进行处理。例如参见this StackOverflow Q&A

注意,你也可以做,你现在在做什么,与数据分割成heavy分区和light分区,但在这种情况下,你要cachefile - 不heavylight - 因为它是file你需要多次处理。就像这样:

cachedFile = file.cache() 

light = cachedFile.filter(lambda (x,y): len(y) < threshold) 
heavy = cachedFile.filter(lambda (x,y): len(y) >= threshold) 

light.foreachPartition(f) 
heavy.foreachPartition(f) 

希望它能帮助。