我开始尝试卡夫卡数据流。我遵循https://kafka.apache.org/0110/documentation/streams/quickstart。卡夫卡字数未更新计数
我的沙盒是一个运行Ubuntu 16.04.2 LTS,Kafka 0.11.0.0和Scala 2.11.11的盒子。
作为卡夫卡流快速启动指南中说明,这里有我跟着步骤:
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
当在流-单词计数输出通过使用后者命令来看,我的标准输出显示以下内容:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
然后,在不中断的bin/kafka-console-consumer.sh命令,我重新运行控制台制片如下:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
我很惊讶标准输出不会改变,以反映这个新增加的导致的变化。在我的理解中,file-input.txt被用来产生额外的数据,所以字数应该刷新(所有的令牌现在应该被计数两次)。 我的推理有什么问题?
当然,'bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo'在整个过程中仍在运行?只是为了仔细检查,你还应该在'streams-file-input'主题上运行一个消费者,以确保你真的在那里添加新的值... –
哦,哦......我没有注意到WordCountDemo没有运行了。再次运行它的输出看起来是正确的。谢谢 !但是,在5秒之后,bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo停止。根据我的理解,这是假设永远运行。我错过了什么? – SCO