1
使用Spark 2.1,我有一个函数,它需要一个DataFrame
并检查所有记录是否在给定的数据库(在这种情况下是Aerospike)。火花累加器计数不正确?
它看起来非常像这样:
def check(df: DataFrame): Long = {
val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult")
df.rdd.foreachPartition(iter => {
val success = //if record is on the database: 1 else: 0
//if success = 0, send Slack message with missing record
finalResult.add(success)
}
df.count - finalResult.value
}
所以,松弛的消息的数量应与该函数返回(丢失记录的总数)的数目,但往往这种情况并非如此 - 例如,我得到一条Slack消息,但是check = 2
。重新运行它提供了check = 1
。
任何想法发生了什么?
那么为什么我只收到一条Slack消息?如果它被处理了两次,那么我应该有两条消息 – shakedzy
嗯,对不起,不确定那么当我没有使用数据帧太多时,它应该是所有的在一个分区上的数据,而不是只有一个记录,你确定你的成功只能是1或0吗?除此之外,我什么也没有 – SiLaf
我认为这是不正确的,因为他在foreach是一个动作,火花保证累加器将被更新一次,因此估价人员应该是正确的。累加器仅在阶段完成时报告,因此当在动作中运行时,即使需要重新运行,部分结果也不会影响最终值。 – puhlen