2016-12-29 124 views
0

谁能帮我从下面的queries.I正在使用卡夫卡的客户端 - 0.10.1.1(单节点单代理)auto.create.topics.enable的卡夫卡客户端API的问题

默认值是真的。

1.I正在使用

kafkaProdcuer<String,String> producer> producer... 
    producer.send(new ProducerRecord<String, String>("my- topic","message")); 
    producer.close(); 

发送消息到一个主题为消费:

kafkaConsumer<String,String> consumer.... 
    consumer.subscribe(Arrays.asList("my-topic")); 
    ConsumerRecords<String, String> records = consumer.poll(200); 

    while(true){ 
    for (ConsumerRecord<String, String> record : records) { 
      System.out.println(record.value()); 
     } 
    } 

问题是,当我运行了消费者的第一次,它没有得到的值。我必须运行制作人并再次运行消费者以获取这些值。有时我必须运行3次制作人。 这是为什么这样工作?

2)enable.auto.commit =假

相同的消费者可以读取消息多次,如果enable.auto.commit属性是假的?

3)考虑到在第一次point.How我的消费者的代码,我可以打破循环我的意思是消费者怎么能知道它已经阅读所有邮件,然后调用consumer.close()

+0

在kafka bin中有一个控制台消费者,您可以尝试它,而您自己的消费者无法使用数据。如果可能的话,尝试添加producer.flush()。对于你的问题3,流媒体程序无法知道批处理结束,但你可以设置一个超时线程来监视没有数据消耗的超时。 – Lhfcws

+0

是的,我用bin使用者测试了它,它给出了错误的时间,当获取元数据与相关ID为1时:{my-topic-106 = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient) – jena84

+0

您是否生成了数据最近在你使用数据之前?默认情况下,Kafka只会将您的数据保存3天。 – Lhfcws

回答

1

1)你总是使用消费者中的同一个group.id?你在食用前是否在生产?这可能与消费者群体和抵消管理有关。请参阅this answer about consumer offset behavior

2)不确定是否意味着有意或无意地读取重复。只要由于主题保留策略而导致邮件未被删除,您总是可以再次阅读相同的邮件以寻找该位置。如果意外,auto-commit设置为false意味着消费者不会为您提供补偿,您必须手动调用commitSync()或commitAsync()。在任何情况下,消费者在提交消息之前都有处理消息并崩溃的机会,在这种情况下,消费者恢复消息时会再次读取已处理但未提交的消息。如果你只想要一次语义,你必须做其他事情,比如用已处理的消息原子地存储偏移量。

3)如提到的Lhfcws,在流中没有像“所有消息”的概念。你可以做的一些事情(技巧)是:

  • 你可以检查如果记录列表由poll返回,如果为空,并且在某些配置的次数后中断循环并退出。
  • 如果订购了消息(您正在从单个分区读取数据),则可以发送一种特殊的END_OF_DATA消息,当您看到消息时,可以关闭消费者。
  • 您可以让消费者读取一些消息,然后退出,下次它将从上次提交的偏移量继续。
+0

感谢Lhfcws和Luciano。我现在明确了第二点和第三点。关于第一点,我在生产者之后立即运行消费者。我没有改变消费者群体。我没有使用bin实用程序创建主题。我假设代码producer.send将创建主题。 bootstrap.servers = localhost:9092 group.id = test enable.auto.commit = true – jena84

+1

jena84,尝试将auto.offset.reset设置为消费者配置中的“最早”,然后重试。此外,在开始消费者等待重新平衡完成之后。 –

+0

太棒了!它的工作。非常感谢。这是因为在链接中提到的原因,你提供了抵消管理。我也试图把“最小”它不允许我。是因为新的消费者API吗? – jena84