5

我是卡夫卡0.9的新手,并测试了一些功能,我意识到Java实现的消费者(KafkaConsumer)中有一个奇怪的行为。卡夫卡消费者的民意调查()方法被阻止

卡夫卡经纪人位于Ambari外部机器。即使你我可以实现一个生产者并开始向外部代理发送消息,我不知道为什么,当消费者试图读取事件(轮询)时,它会卡住。

我知道生产者工作得很好,因为我可以通过控制台消费者(在ambari上本地工作)消费消息。但是,当我执行Java消费者时,什么也没有发生,只是卡住了。调试代码,我可以看到它被挡在poll()行:

ConsumerRecords<String, String> records = consumer.poll(100); 

超时什么也不做,顺便说一句。如果您将0,100或1000毫秒设置为0,则无关紧要,消费者在此行中被阻止,并且不会超时也不会抛出异常。

我尝试了所有种类的替代性,如advertised.host.nameadvertised.listener,...等等,零运气。

任何帮助将不胜感激。提前致谢!

+0

你能否像使用'kafka-console-consumer.sh'一样以不同的方式使用消息? –

+0

是的,我是。从承载ambari的机器,我可以通过控制台消费者使用消息 –

+0

从机器运行消费者的情况如何?你有没有在那里尝试控制台用户? –

回答

1

原因可能是您的消费者代码正在运行的计算机无法连接到动物园管理员。尝试在安装Kafka的计算机上运行相同的客户代码(我试过这个并为我工作)。我也通过提及server.properties文件中的以下属性来解决问题: advertised.host.name="ip address which you want to expose" //在我的情况下,它是公共ip的ec2机器,我有kafka和zookeeper安装在相同的ec2上。 advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); 以上声明并不意味着消费者会在100 ms后超时,这是投票期。无论在100 ms内捕获的数据是否被读入记录集合。