2017-03-10 88 views
0

我正在尝试构建pub/sub应用程序,我正在探索最佳工具。我目前正在寻找卡夫卡并有一个小演示应用程序已经运行。但是,我正在遇到一个概念问题。没有消费者连接时,卡夫卡经纪人是否可以保留消息?

我有一个制片人(的Java代码):

String topicName = "MyTopic; 
    String key = "MyKey"; 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
    props.put("acks", "all"); 
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    Producer<String, byte[]> producer = new KafkaProducer <String, byte[]>(props); 

    byte[] data = <FROM ELSEWHERE>; 
    ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topicName, key, data); 

    try { 
     RecordMetadata result = producer.send(record).get(); 
    } 
    catch (Exception e) { 
     // Nothing for now 
    } 
    producer.close(); 

当我开始通过Kakfa命令行工具消费者:

kafka-console-consumer --bootstrap-server localhost:9092 --topic MyTopic 

,然后我执行制片人的代码,我看到我的消费者终端上显示的数据信息。

然而,如果我这样做前执行制片人跑消费者,出现一则“丢失”。当我启动消费者(执行生产者之后)时,消费者终端中不会显示任何内容。

有谁知道是否有可能让卡夫卡经纪人保留消息,而没有消费者连接?如果是这样,怎么样?

+0

尝试附加'--from-beginning'来查看您的问题是否消失。这可能是由于默认的偏移重置策略。查看消费者配置中的'auto.offset.reset'的详细信息。 – amethystic

+0

@amethystic是的,它做到了!感谢您的参考。把这个答案,我会接受。它也很高兴地看到它显示了我所有的其他“丢失”消息。 – Brett

+0

在学习期间不要错过这些设置的一个很好的起点是https://kafka.apache.org/quickstart。 – randominstanceOfLivingThing

回答

4

--from-beginning附加到控制台消费者命令,使其从最早的偏移量开始消耗。这实际上是由config auto.offset.reset控制的偏移复位策略。下面是这个配置是指:

怎么办时,没有初始卡夫卡或者如果电流偏移不存在任何更多的服务器(例如因为该数据已被删除)偏移:

earliest:自动复位偏移,偏移最早

latest:自动复位偏移到最新偏移

none:如果没有先前偏移发现为C抛出异常给消费者onsumer的团体 其他:向消费者抛出异常。

+0

正是我需要的。 – Brett