2016-07-05 173 views
0

我想在pyspark中定义一个类型为List的累加器,并为工作节点累加字符串值。这里是代码我有:spark中的自定义累加器类

class ListParam(AccumulatorParam): 
def zero(self, v): 
    return [] 
def addInPlace(self, acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

我然后定义该类型的蓄能器,如下

accu = sc.accumulator([], ListParam()) 

,然后不同的值在执行者添加到它如下

accu.add("abc") 

我希望值abc在累加器中只显示一个值,但累加器会添加三个不同的值(一个pr字符),当我查看accu在驱动程序中的值看起来像['a','b','c']。我如何改变它,使它不会将每个字符作为累加器中的单独条目添加?

--------------编辑----------------

我定义的另一个自定义类为我的蓄电池如下

class VectorAccumulatorParam(AccumulatorParam): 
def zero(self, value): 
    return [0.0] * len(value) 
def addInPlace(self, val1, val2): 
    for i in range(len(val1)): 
     val1[i] += val2[i] 
    return val1  

和工人中我有下面的代码

global accu 
accu += [accuracy] 

,但是当我在驱动程序打印accu,它是空的。哪里不对了?

+0

它是如何使用的? – 2016-07-06 16:24:53

回答

0

您是否试图明确告诉spark执行您的操作,处理累加器?正如你应该知道的,spark's operations are lazy,很多时候你需要调用rdd.collect()来实际执行你的映射。