2017-08-30 42 views
0

我开始尝试卡夫卡数据流。我遵循https://kafka.apache.org/0110/documentation/streams/quickstart卡夫卡字数未更新计数

我的沙盒是一个运行Ubuntu 16.04.2 LTS,Kafka 0.11.0.0和Scala 2.11.11的盒子。

作为卡夫卡流快速启动指南中说明,这里有我跟着步骤:

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt 

bin/kafka-topics.sh --create \ 
    --zookeeper localhost:2181 \ 
    --replication-factor 1 \ 
    --partitions 1 \ 
    --topic streams-file-input 

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ 
    --topic streams-wordcount-output \ 
    --from-beginning \ 
    --formatter kafka.tools.DefaultMessageFormatter \ 
    --property print.key=true \ 
    --property print.value=true \ 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

当在流-单词计数输出通过使用后者命令来看,我的标准输出显示以下内容:

all 1 
streams 1 
lead 1 
to 1 
kafka 1 
hello 1 
kafka 2 
streams 2 
join 1 
kafka 3 
summit 1 

然后,在不中断的bin/kafka-console-consumer.sh命令,我重新运行控制台制片如下:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 

我很惊讶标准输出不会改变,以反映这个新增加的导致的变化。在我的理解中,file-input.txt被用来产生额外的数据,所以字数应该刷新(所有的令牌现在应该被计数两次)。 我的推理有什么问题?

+0

当然,'bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo'在整个过程中仍在运行?只是为了仔细检查,你还应该在'streams-file-input'主题上运行一个消费者,以确保你真的在那里添加新的值... –

+0

哦,哦......我没有注意到WordCountDemo没有运行了。再次运行它的输出看起来是正确的。谢谢 !但是,在5秒之后,bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo停止。根据我的理解,这是假设永远运行。我错过了什么? – SCO

回答