2017-06-16 62 views
0

嗨我一直在试图学习KAFKA,并与我的远程轮询/消费者有问题。KAFKA REMOTE AWS consumer.poll

我在AWS EC2实例中使用私有和公有IP设置了KAFKA。我的server.properties看起来像这样。

听众= PLAINTEXT://172.31.31.58:9092 #AWS专用IP

advertised.listeners = PLAINTEXT:// 35 ?? ?? ??:。9092 #AWS公共IP屏蔽

我的AWS EC2安全组配置为允许通过任何端口上的任何IP进行流量测试。 9092 --topic测试

当我产生/本地,在我的EC2实例使用下面的脚本将它完美

斌/ kafka-console-producer.sh --broker列表本地主机使用消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

但是,当我尝试从我的远程笔记本电脑连接到相同的kafka实例Eclipse代码运行我的java API,我的代码永远在consumer.poll(100)中挂起。我在这里做错了什么吗?

 Properties props = new Properties(); 
    props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners 
    props.put("group.id", "test123"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("test")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 

    } } 
+0

你能张贴您的消费记录文件启动你的客户?看到调试消息会很有帮助。 – PragmaticProgrammer

回答

1

您确定它挂在poll()?或者是poll()只是返回一个空的ConsumerRecords,它在while(true)中循环?

默认情况下,如果您尚未为该组提交任何偏移量,则使用者将从该主题的末尾开始,因此它只会接收新消息。在这种情况下,如果你想在主题中已经消耗的消息,你需要设置auto.offset.resetearliest(如你在控制台消费者与--from-beginning一样)

编辑:

如果它实际上是停留在poll() ,这可能是一个连接问题。为了找出答案,最好的方法是在启用日志记录的情况下运行客户端。创建一个包含文件:

log4j.rootLogger=DEBUG, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 

-Dlog4j.configuration=file:PATH_TO_FILE

+0

是的Mickael,它挂在民意调查中,基本上它不会移动到下一行,我已经验证了这一点。 –

+0

嗨米克尔,这肯定是一个网络问题。我早些时候在我的办公室网络中尝试过这种方式,但它不起作用,可以在不同的网络中正常工作事实上,我运行我的程序时启用了调试日志,根据您的建议,并没有记录任何建议任何网络问题。但是现在这种方式可以在不同的网络中运行,并且您关于网络问题的建议会让我想到通过不同的网络来运行它。非常感谢你。 –