0

我学习卡夫卡流和我有一个字计数的在Java中8的第一个例子,从文档拍摄的问题。卡夫卡流 - 第一个例子字计数不正确计算第一圈

使用卡夫卡的最新可用版本流,卡夫卡Connect和字计数lambda表达式的例子。

我遵循以下步骤: 我创建卡夫卡输入话题,输出一个。启动应用程序流,然后通过从.txt文件

在第一计数插入一些的话,输出话题我看到正确的分组词语的上传输入话题,但数是错误的。如果我试图重新插入相同的单词,那么以前错误计数的连续计数都是正确的。

如果我寻找与消费者控制台输入话题转储,它的正确加载,并且不存在脏数据。

第一次算错了怎么回事?

实施例[第一DATA]: (卡夫卡输入主题) 喜喜 麦克风麦克风 测试

(应用程序流式传输正在运行)

(输出主题)。在12麦克4试验3(随便计数)

[连续数据 - 发布输入主题中的相同的话]

(输出主题)。在14麦克风6试验4

[新的尝试]

(输出主题)。在16麦克风8试验5

等....

+0

听起来很奇怪。你能否可靠地再现这个问题?这不应该发生。 –

回答

3

在阿帕奇卡夫卡的字计数演示具有the following lines

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data 
// Note: To re-run the demo, you need to use the offset reset tool: 
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

这意味着,当您重新启动应用程序时,它将从最开始(“最早”)开始读取其输入主题iff there is no existi存储在Kafka中的WordCount应用的消费者偏移量。一定程度的应用不活动后,应用的消费者抵消将在卡夫卡过期,默认值为24小时(参见offsets.retention.minutesbroker configuration)。

我能想象得到以下事情发生了:

  • 你尝试了卡夫卡一段时间较早,进入测试数据输入话题。
  • 然后,在恢复实验之前,您先进行了> 24小时的休息。
  • 现在的应用程序,当它重新启动,恢复了从一开始就一路重新读取输入的话题,从而拾起旧的测试输入数据,从而导致“膨胀”计数。

如果我查看使用者控制台的输入主题转储,它正确加载并且没有脏数据。

您可以通过控制台消费者再次在输入题目看,同时增加了CLI选项--from-beginning(见https://kafka.apache.org/documentation/#quickstart_consume)证实了以上我的假设。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning 

这将显示主题“yourInputTopic”中的所有可用数据 - 减去可能已在此期间,卡夫卡主题清除所有数据(默认代理配置将清除比旧数据7天,参见)。

+0

谢谢你的回答。实际上,当我在24小时后进行测试(然后是新的偏移量)时,我正在删除旧的主题(我启用了取消)并从头开始重新创建它们以实现新的干净执行。问题再次出现。但是现在我已经在示例代码中添加了linesConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,“最早的”)行为,并且似乎运行良好。也许我还没有解决这个问题,但它的工作原理。 –

+0

太棒了,很高兴听到它现在起作用! –

+0

几周前我有类似的问题,但有时计数是负数。这可能是由类似的东西造成的吗? – foxygen