2017-04-25 64 views
0

如下所示,我的代码是一个高层次的消费者在kafka服务器中获取32个分区的主题,我很困惑,为什么有时我会从consumer.poll()中获得一个空的返回值。 我试图增加轮询超时,然后当我增加超时到1000,然后每个轮询都有返回数据,而我设置超时到10或0,然后我看到很多空回报。当卡夫卡消费者调查返回空记录?

任何人都可以告诉我如何设置正确的超时?

def main(args: Array[String]): Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", "kafka-01:9098") 
    props.put("group.id", "kch1") 
    props.put("enable.auto.commit", "true") 
    props.put("auto.commit.interval.ms", "1000") 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 

    //props.put("max.poll.records", "1000") 
    val consumers = new Array[KafkaConsumer[String, String]](16) 
    for(i <- 0 to 15) { 
     consumers(i) = new KafkaConsumer[String, String](props) 
     consumers(i).subscribe(util.Arrays.asList("veh321")) 
    } 
    var cnt = 0 
    var cacheIterator: Iterator[ConsumerRecord[String, String]] = null 
    for(i <- 0 to 15) { 
     new Thread(new Runnable { 
     override def run(): Unit = { 
      var finish = false 
      while(!finish) { 
      val start = System.currentTimeMillis() 
      cacheIterator = consumers(i).poll(100).iterator() 
      val end = System.currentTimeMillis() - start 
      if (end > 10) { 
       println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}") 
      } 
      } 
     } 
     }).start() 
    } 

回答

0

Java的消费者采用的Linux的epoll的是通过调用java.nio.channels.Selector.select(超时)底层实现方案。如果仅在100 ms内尝试在短时间间隔内准备好多少个SelectionKeys,则很可能不会返回任何内容。

此外,在同样的100毫秒内,消费者会做其他工作,包括轮询协调员状态,所以记录轮询的实时间隔明显小于100毫秒,这使得难以检索到一些真正有用的东西。

+0

因此,如果我将轮询时间设置为1000毫秒,我可以看到所有的轮询都有数据返回,其中一些花费大约200毫秒,而其他花费几毫秒,那么当轮询花费需要几百秒时会发生什么?有没有任何参数来扭转这种情况? –

+0

许多底层操作需要在检索记录之前完成,例如获取元数据,管理连接和组等。如果您正在使用Java消费者,则可以调整'max.poll.records'来控制单轮轮询的消息数。 – amethystic

+0

感谢您的帮助,所以如果我想要一个很大的吞吐量,我可以设置一个大poll.records和一个大轮询超时,以减少民意调查的时间,我是对吗? –