2

我正在尝试使用Spark流(每秒从Kafka读取)进行汇总和查找一些指标。我可以汇总该特定分钟的数据。我如何确保当天可以有桶,并总结当天所有分钟的总值?来自Spark流中不同微批量的汇总数据

我有一个数据框,我正在做类似的事情。

sampleDF = spark.sql("select userId,sum(likes) as total from likes_dataset group by userId order by userId") 

回答

1

您可以使用“Watermarking”功能,从结构化编程流

示例代码

import spark.implicits._ 

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } 

    val windowedCounts = words 
     .withWatermark("timestamp", "10 minutes") 
     .groupBy(
      window($"timestamp", "10 minutes", "5 minutes"), 
      $"word") 
     .count() 
+1

谢谢您的回答。我试过了。 Spark不保留以前的微批次价值。如果我有60秒的微量批处理间隔,并且如果我尝试创建10分钟的窗口,则12:01:00处的值不会与12:02:00的值进行汇总。对于12:02:00,它只能找到最近收到的数据的聚合。如何存储所有10分钟的汇总数据? – Passionate

+1

我有从卡夫卡流获取数据的主要功能。并且,它为每个RDD调用一个函数。在这个函数里面,我会汇总这些值。但是对于每个RDD,聚合值都会重置。以前的汇总值不会保留。我不知道如何为此Spark会话生命定义一个全局聚合数据框架,并合并所有聚合数据。有人可以帮忙吗? – Passionate

1

我想通了,这是怎么回事。我在Spark中了解了有状态的流媒体并帮助了我。

所有我必须做的是,

running_counts = countStream.updateStateByKey(updateTotalCount, initialRDD=initialStateRDD) 

,我不得不写这updateTotalCount功能说如何合并微一批新的汇总数据旧的汇总数据。在我的情况下,更新功能如下所示:

def updateTotalCount(currentCount, countState): 
    if countState is None: 
     countState = 0 
    return sum(currentCount) + countState 
+0

感谢您的回答。你能解释一下'countState',它有什么?还有你的'countStream'有一个RDD流?你如何获得'initialRDD'和'initialStateRDD'? – Jordon