2017-08-02 77 views
0

我玩的卡夫卡流API(Kakfa版本:0.10.2.0)试图做一个简单的wordcount示例工作:Wordcount App gist。我同时运行生产者和消费者的控制台:卡夫卡流字数统计应用程序

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092

./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning 

启动应用程序,一切似乎是工作的罚款,但是当我在控制台内生产一些字符串类型,消费者接受什么都没有。如果我改变了应用程序做对消费者接收流输入一个简单的toUppercase(修改为大写)罚款:

//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")

有谁知道为什么我的字计数例如接收什么?我应该在消费者上指定任何序列化程序吗?在我的最后一次测试控制台消费者处理,我通过控制台发送,但并没有表现出他们的消息,请参阅下面的输出:

➜ bin ./kafka-console-consumer.sh \ 
      --topic output-topic \ 
      --bootstrap-server localhost:9092 \ 
      --from-beginning                     
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 : 
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned 
to any members in the group console-consumer-91651 : [output-topic] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

^CProcessed共有7级的消息

回答

2

KStream的作品,因为它不使用缓存。对于KTable你必须等一下,或者将cache.max.bytes.buffering设置为0(但不是在生产代码!)

+0

太棒了!这就是诀窍!非常感谢你!我想我需要阅读更多关于kafka流内部的内容。再次感谢你@Arek – ardlema

+0

我很高兴帮助你@ardlema :) – Arek