2016-11-10 192 views
4

考虑下面的代码:为什么我没有看到Kafka Streams reduce方法的输出?

KStream<String, Custom> stream = 
    builder.stream(Serdes.String(), customSerde, "test_in"); 

stream 
    .groupByKey(Serdes.String(), customSerde) 
    .reduce(new CustomReducer(), "reduction_state") 
    .print(Serdes.String(), customSerde); 

我有减速机,成功地打印出时,我想到的减少发生的方法适用于内println声明。但是,上面显示的最终打印语句不显示任何内容。同样如果我使用to方法而不是print,我在目标主题中看不到消息。

reduce语句后需要什么来查看减少的结果?如果一个值被推送到输入,我不期望看到任何东西。如果使用同一个键的第二个值被推送,我期望reducer被应用(它所做的),我也期望减少的结果继续到处理管道中的下一个步骤。如上所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。

+2

尝试设置'StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG'到值0。 –

+0

@ MatthiasJ.Sax由于!这为我解决了这个问题,请随时发布它作为答案,以便我可以给你奖励点数。如果你可以包含一个关于这个配置细节和其他喜欢它的更多信息的链接,我也会很感激它。 – LaserJesus

回答

7

从Kafka 0.10.1.0开始,所有聚合运算符都使用内部重复数据删除缓存来减少结果KTable更改日志流的负载。例如,如果您使用同一个键直接计数并处理两个记录,则完整的更新日志流将为<key:1>, <key:2>

使用新的缓存功能,缓存将接收到<key:1>并存储它,但不会立即向下游发送。计算<key:2>时,它将替换缓存的第一个条目。根据缓存大小,不同密钥的数量,吞吐量以及提交间隔,缓存会向下游发送条目。这发生在单个密钥条目的缓存逐出或缓存的完全刷新(发送所有下游条目)时发生。因此,KTable更新日志可能只显示<key:2>(因为<key:1>得到了重复删除)。

您可以通过Streams配置参数StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG来控制缓存的大小。如果将值设置为零,则完全禁用缓存,并且KTable更新日志将包含所有更新(有效提供0.10.1.0行为)。

汇合的文档包含的截面说明更详细地缓存:

相关问题