2017-06-23 33 views
0

卡夫卡消费者民意调查api没有返回记录到低超时。 如果我增加poll中的超时值,那么记录即将到来。 我无法理解这个逻辑。请按照以下代码进行帮助:卡夫卡cosumer民意调查没有返回记录的低超时

public ConsumerRecords<String, Map<String, String>> subscribeToQueue(String topic, QueueListener q) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "com.intuit.eventcollection.queue.KafkaJsonDeserializer"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("auto.offset.reset", "earliest"); 

    // Figure out where to start processing messages from 
    KafkaConsumer<String, Map<String, String>> kafkaConsumer = new KafkaConsumer<String, Map<String, String>>(
      props); 
    kafkaConsumer.subscribe(Arrays.asList(topic)); 
    ConsumerRecords<String, Map<String, String>> records = null; 
    // Start processing messages 
    try { 
     records = kafkaConsumer.poll(100); 

回答

0

如果在指定为轮询超时(timeout)的时间段内没有发布新消息,则轮询将不返回任何内容。

+0

我看到超时定义为: - timeout - 如果数据不可用,等待轮询的时间(以毫秒为单位)。如果为0,则立即返回任何现在可用的记录。一定不能是负面的。在测试中,我一直在主题和相同流中发送数据,我试图将超时时间为100的消息使用。但是没有任何内容从主题返回。但是,当我将超时时间增加到200时,消息开始从主题出发。正如你的回复“在这段时间内没有发布任何未消费的新消息”,我在测试时无法理解。 – vidya