2016-09-29 169 views
0

我想获得卡夫卡消费者的进步,即滞后。我知道下面的命令给了我时滞和其他有价值的描述。使用kafka库查找消费者抵消滞后的代码?

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2182 --describe --group DemoConsumer

bin/kafka-consumer-groups.sh --zookeeper localhost:2182 --describe --group DemoConsumer

我还可以得到目前消费者使用下面的代码片段与卡夫卡的客户端的帮助抵消

ConsumerRecords<Integer, String> records = consumer.poll(100); 
for (ConsumerRecord<Integer, String> record : records) { 

      System.out.println("Received message: (" + record.topic()+ ", 
      " + record.partition()+ ", " + record.key() + ", " + 
      record.value() + ") at offset " + record.offset()); 
} 

但我不能找到一个代码来获得作为上述两个命令的细节。有没有任何代码可以使用kafka库来查找延迟和其他细节?

回答

0

根据this topic,您可以得到消费者滞后。然而,Maven的依赖关系是不对的主题,它应该是

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.12</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

而且代码:

AdminClient client = AdminClient.createSimplePlaintext("localhost:9092"); 
Map<TopicPartition, Object> offsets = JavaConversions.asJavaMap(
client.listGroupOffsets("groupID")); 
Long offset = (Long) offsets.get(new TopicPartition("topic", 0)); 
相关问题