2017-06-19 322 views
1

只是为了学习的目的,我试图设置一个字典作为累加器中的全局变量添加功能效果很好,但我运行代码并将字典放在地图函数中,它总是返回空。pyspark中的累加器与字典作为全局变量

但是设置列表作为一个全局变量

class DictParam(AccumulatorParam): 
    def zero(self, value = ""): 
     return dict() 

    def addInPlace(self, acc1, acc2): 
     acc1.update(acc2) 


if __name__== "__main__": 
    sc, sqlContext = init_spark("generate_score_summary", 40) 
    rdd = sc.textFile('input') 
    #print(rdd.take(5)) 



    dict1 = sc.accumulator({}, DictParam()) 


    def file_read(line): 
     global dict1 
     ls = re.split(',', line) 
     dict1+={ls[0]:ls[1]} 
     return line 


    rdd = rdd.map(lambda x: file_read(x)).cache() 
    print(dict1) 
+0

我的问题是地图总是空的 – user3341953

回答

1

我相信print(dict1())只是在rdd.map()之前执行。

火花,有两种类型的operations

  • 转变,即描述未来计算
  • 和行动,也呼吁采取行动,实际上触发执行

累加器仅在some action is executed时更新:

累加器不会更改Spark的懒惰评估模型。如果他们 正在RDD上的操作内更新,则其值仅为 ,因为RDD是作为操作的一部分计算的。

如果检查出的文档的本节结束时,有一个例子恰好喜欢你:

accum = sc.accumulator(0) 
def g(x): 
    accum.add(x) 
    return f(x) 
data.map(g) 
# Here, accum is still 0 because no actions have caused the `map` to be computed. 

所以,你会需要添加一些动作,例如:

rdd = rdd.map(lambda x: file_read(x)).cache() # transformation 
foo = rdd.count() # action 
print(dict1) 

请确保检查各种RDD功能和累加器特性的细节,因为这可能会影响结果的正确性。 (例如,rdd.take(n)默认为only scan one partition,不是整个数据集。)

+0

谢谢,我现在会尝试。 – user3341953

1

对于范围内的行为进行蓄能器的更新类似的代码只,其价值是 只更新一次该RDD计算作为行动的一部分

+0

谢谢你的回应。我不太明白,为什么在我的代码中,字典作为全局变量没有更新,一直是空的?我厌倦列表案例,它运作良好。你能解释更多吗?在此先感谢 – user3341953