2017-05-26 90 views
1

使用Spark 2.1,我有一个函数,它需要一个DataFrame并检查所有记录是否在给定的数据库(在这种情况下是Aerospike)。火花累加器计数不正确?

它看起来非常像这样:

def check(df: DataFrame): Long = { 
    val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult") 
    df.rdd.foreachPartition(iter => { 
     val success = //if record is on the database: 1 else: 0 
     //if success = 0, send Slack message with missing record 
     finalResult.add(success) 
     } 
     df.count - finalResult.value 
    } 

所以,松弛的消息的数量应与该函数返回(丢失记录的总数)的数目,但往往这种情况并非如此 - 例如,我得到一条Slack消息,但是check = 2。重新运行它提供了check = 1

任何想法发生了什么?

回答

0

对于不同工作人员的相同数据,Spark可以多次运行一个方法,这意味着您要计算每个成功次数*在任何工作人员上处理数据的次数。因此,您可以在累加器中获得不同通过同一数据的不同结果。

在这种情况下,您无法使用累加器来确切计数。抱歉。 :(

+0

那么为什么我只收到一条Slack消息?如果它被处理了两次,那么我应该有两条消息 – shakedzy

+0

嗯,对不起,不确定那么当我没有使用数据帧太多时,它应该是所有的在一个分区上的数据,而不是只有一个记录,你确定你的成功只能是1或0吗?除此之外,我什么也没有 – SiLaf

+2

我认为这是不正确的,因为他在foreach是一个动作,火花保证累加器将被更新一次,因此估价人员应该是正确的。累加器仅在阶段完成时报告,因此当在动作中运行时,即使需要重新运行,部分结果也不会影响最终值。 – puhlen