2017-07-26 61 views
1

我在火花的工作得到一个错误,是令人惊讶的我:如何在减少前避免较大的中间结果?

Total size of serialized results of 102 tasks (1029.6 MB) is 
bigger than spark.driver.maxResultSize (1024.0 MB) 

我的工作是这样的:

def add(a,b): return a+b 
sums = rdd.mapPartitions(func).reduce(add) 

RDD有〜500个分区和FUNC发生在该分区中的行和返回一个大数组(一个1.3M双数或〜10Mb的numpy数组)。 我想总结所有这些结果并返回它们的总和。

Spark似乎将mapPartitions(func)的总结果保存在内存中(大约5GB),而不是逐步处理它,这只需要大约30Mb。

而不是增加spark.driver.maxResultSize,有没有办法减少更多的增量?


更新:其实我有点惊讶,更多的是,这两个结果永远保存在内存中。

回答

3

这里没有什么特别令人惊讶的。在使用reduce时,Spark将对驾驶员进行最终减速。如果func返回一个对象这实际上相当于:

reduce(add, rdd.collect()) 

您可以使用treeReduce

import math 

# Keep maximum possible depth 
rdd.treeReduce(add, depth=math.log2(rdd.getNumPartitions())) 

toLocalIterator

sum(rdd.toLocalIterator()) 

前者会递归合并分区上工作人员以增加网络交换为代价。您可以使用depth参数调整性能。

后者仅在当时收集单个分区,但可能需要重新评估rdd,并且该作业的重要部分将由驱动程序执行。

取决于使用BlockMatrices

func使用还可以由矩阵分割成块,并进行加法逐块,例如提高工作分配的确切逻辑