2017-01-09 62 views
3

我的Kafka主题包含由deviceId键入的状态。我想使用KStreamBuilder.stream().groupByKey().aggregate(...)仅保留TimeWindow中的最新状态值。我想的是,只要题目由键分区,则聚合功能可以随时以这种方式返回的最新值:卡夫卡流聚合是否有任何订购保证?

(key, value, older_value) -> value

这是一个保证,我可以从卡夫卡流期望?我应该推出自己的检查时间戳的处理方法吗?

回答

4

卡夫卡流保证通过偏移但不时间戳排序。因此,默认情况下,“上次更新获胜”策略基于偏移量,但不以时间戳为基础。迟到的记录(在时间戳上定义的“迟到”)是基于时间戳的乱序,并且它们不会被重新排序以保持原始偏移量的顺序。

如果你想让你的窗口包含基于时间戳的最新值,你将需要使用Processor API(PAPI)来完成这项工作。

在Kafka Streams的DSL中,您无法访问获取正确结果所需的记录时间戳。一个简单的方法可能是在.groupBy()之前加上.transform(),并将时间戳添加到记录(即其值)本身。因此,您可以使用Aggregator中的时间戳(顺便说一句:.reduce()也可以,而不是.aggregate())。最后,您需要在.aggregate()之后执行.mapValues()以再次从该值中删除时间戳。

使用DSL和PAPI的这种混合匹配方法应该可以简化您的代码,因为您可以使用DSL窗口支持和KTable,并且不需要进行低级时间窗口和状态管理。

当然,你也可以在一个单一的低级有状态处理器中完成所有这些,但我不会推荐它。

+1

如果我理解的很好,这意味着排序不保证;) – Steve

+2

我更新了我的问题。有抵消的订单保证基础。 –