2016-07-22 164 views
0

我正在使用Kafka 0.9.0.1。Kafka:从主题消耗第一条消息时间歇性缓慢

我第一次开始了我的应用程序需要20-30秒内从

我用不同的卡夫卡经纪人(具有不同CONFIGS)专题检索“最新”消息,但我仍然看到此行为。随后的消息通常没有缓慢。

这是预期的行为?你可以清楚地看到以下运行此示例应用程序,改变经纪/主题名称你自己的设置

public class KafkaProducerConsumerTest { 

    public static final String KAFKA_BROKERS = "..."; 
    public static final String TOPIC = "..."; 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
     new KafkaProducerConsumerTest().run(); 
    } 

    public void run() throws ExecutionException, InterruptedException { 
     Properties consumerProperties = new Properties(); 
     consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     consumerProperties.setProperty("group.id", "Test"); 
     consumerProperties.setProperty("auto.offset.reset", "latest"); 
     consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC); 
     Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume()); 

     Properties producerProperties = new Properties(); 
     producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC); 
     kafkaProducer.publish("Test Message"); 
    } 
} 


class MyKafkaConsumer { 
    private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class); 
    private KafkaConsumer<String, Object> kafkaConsumer; 

    public MyKafkaConsumer(Properties properties, String topic) { 
     kafkaConsumer = new KafkaConsumer<String, Object>(properties); 
     kafkaConsumer.subscribe(Lists.newArrayList(topic)); 
    } 

    public void consume() { 
     while (true) { 
      logger.info("Started listening..."); 
      ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE); 
      logger.info("Received records {}", consumerRecords.iterator().next().value()); 
     } 
    } 
} 

class MyKafkaProducer { 
    private KafkaProducer<String, Object> kafkaProducer; 
    private String topic; 

    public MyKafkaProducer(Properties properties, String topic) { 
     this.kafkaProducer = new KafkaProducer<String, Object>(properties); 
     this.topic = topic; 
    } 

    public void publish(Object object) throws ExecutionException, InterruptedException { 
     ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object); 
     Future<RecordMetadata> response = kafkaProducer.send(producerRecord); 
     response.get(); 
    } 

} 

回答

1

,因为当你指定的消费群体开始一个新的消费者的第一条消息应该需要更长的时间比其他人通过consumerProperties.setProperty("group.id", "Test");声明,Kakfka将平衡这些分区,使得每个分区最多被一个消费者使用,并将跨多个消费者进程分配该主题的分区。

此外,卡夫卡0.9还有一个单独的__consumer_offsets主题,卡夫卡用它来管理消费群中每个消费者的偏移量。很可能当你第一次启动消费者时,它会查看这个主题来获取最新的偏移量(可能有一个消费者正在消耗这个话题,而这个消费者会被杀死,因此有必要从正确的偏移量)。

这2个因素会导致消耗第一组消息的延迟更高。我无法评论20-30秒的确切延迟,但我想这应该是默认行为。

PS:准确的数字还可能取决于其他次要因素,例如您是在同一台机器上(不存在网络延迟)的消费者运行代理&,还是在使用TCP进行通信的不同的次要因素。

0

现在很多时候只用最少的日志记录添加就试过了你的代码。以下是一个典型的日志输出:

2016-07-24 15:12:51,417 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,604 producer has send message|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,619 producer got response, exiting|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Received records [Test Message]|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:54,680 returning on empty poll result|INFO|KafkaProducerConsumerTest 

事件的顺序如预期的那样及时。消费者开始投票,生产者发送消息并接收结果,消费者接收消息并且以300ms完成所有这些。然后消费者再次开始轮询,并在3秒后抛出,因为我分别更改轮询超时。

我为经纪人和客户端库使用Kafka 0.9.0.1。连接在localhost上,它是一个没有负载的测试环境。

为了完整起见,这里是上面交换机触发的服务器的日志形式。

[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 0 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Stabilized group Test generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,617] INFO [GroupCoordinator 0]: Assignment received from leader for group Test for generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,635] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,637] INFO [GroupCoordinator 0]: Group Test generation 1 is dead and removed (kafka.coordinator.GroupCoordinator) 

您可能想要与您的服务器日志进行同一交换的比较。

+0

感谢您的尝试,我断断续续地看到了这种瞬间行为,但如果您没有请尝试几次,您应该看到延迟。另外,我很欣赏你的理论,但是我在“开始聆听”之后还发布了一秒钟的消息,而且还需要20秒左右 – DJ180

0

根据this link

尝试在你的消费者设置group_id=None,或致电consumer.close() 结束脚本,或使用分配()之前不同意()。否则,您将 重新加入已知但成员没有响应的现有组。 组协调员将等待,直到那些成员签入/离开/超时。 由于消费者不再存在(它是您先前的脚本运行),因此它们有 超时。 和consumer.poll()会在组重新平衡期间阻塞。

因此,如果您与无响应的成员加入群组(可能会不正常地终止应用程序),这是正确的行为。

请确认您在退出应用程序之前调用“consumer.close()”。