谁能帮我从下面的queries.I正在使用卡夫卡的客户端 - 0.10.1.1(单节点单代理)auto.create.topics.enable的卡夫卡客户端API的问题
默认值是真的。
1.I正在使用
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
发送消息到一个主题为消费:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
问题是,当我运行了消费者的第一次,它没有得到的值。我必须运行制作人并再次运行消费者以获取这些值。有时我必须运行3次制作人。 这是为什么这样工作?
2)enable.auto.commit =假
相同的消费者可以读取消息多次,如果enable.auto.commit属性是假的?
3)考虑到在第一次point.How我的消费者的代码,我可以打破循环我的意思是消费者怎么能知道它已经阅读所有邮件,然后调用consumer.close()
在kafka bin中有一个控制台消费者,您可以尝试它,而您自己的消费者无法使用数据。如果可能的话,尝试添加producer.flush()。对于你的问题3,流媒体程序无法知道批处理结束,但你可以设置一个超时线程来监视没有数据消耗的超时。 – Lhfcws
是的,我用bin使用者测试了它,它给出了错误的时间,当获取元数据与相关ID为1时:{my-topic-106 = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient) – jena84
您是否生成了数据最近在你使用数据之前?默认情况下,Kafka只会将您的数据保存3天。 – Lhfcws