我是卡夫卡0.9的新手,并测试了一些功能,我意识到Java实现的消费者(KafkaConsumer
)中有一个奇怪的行为。卡夫卡消费者的民意调查()方法被阻止
卡夫卡经纪人位于Ambari外部机器。即使你我可以实现一个生产者并开始向外部代理发送消息,我不知道为什么,当消费者试图读取事件(轮询)时,它会卡住。
我知道生产者工作得很好,因为我可以通过控制台消费者(在ambari上本地工作)消费消息。但是,当我执行Java消费者时,什么也没有发生,只是卡住了。调试代码,我可以看到它被挡在poll()
行:
ConsumerRecords<String, String> records = consumer.poll(100);
超时什么也不做,顺便说一句。如果您将0,100或1000毫秒设置为0,则无关紧要,消费者在此行中被阻止,并且不会超时也不会抛出异常。
我尝试了所有种类的替代性,如advertised.host.name,advertised.listener,...等等,零运气。
任何帮助将不胜感激。提前致谢!
你能否像使用'kafka-console-consumer.sh'一样以不同的方式使用消息? –
是的,我是。从承载ambari的机器,我可以通过控制台消费者使用消息 –
从机器运行消费者的情况如何?你有没有在那里尝试控制台用户? –