嗨我一直在试图学习KAFKA,并与我的远程轮询/消费者有问题。KAFKA REMOTE AWS consumer.poll
我在AWS EC2实例中使用私有和公有IP设置了KAFKA。我的server.properties看起来像这样。
听众= PLAINTEXT://172.31.31.58:9092 #AWS专用IP
advertised.listeners = PLAINTEXT:// 35 ?? ?? ??:。9092 #AWS公共IP屏蔽
我的AWS EC2安全组配置为允许通过任何端口上的任何IP进行流量测试。 9092 --topic测试
:当我产生/本地,在我的EC2实例使用下面的脚本将它完美
斌/ kafka-console-producer.sh --broker列表本地主机使用消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
但是,当我尝试从我的远程笔记本电脑连接到相同的kafka实例Eclipse代码运行我的java API,我的代码永远在consumer.poll(100)中挂起。我在这里做错了什么吗?
Properties props = new Properties();
props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners
props.put("group.id", "test123");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
} }
你能张贴您的消费记录文件启动你的客户?看到调试消息会很有帮助。 – PragmaticProgrammer