2017-07-25 220 views
1

如何计算在卡夫卡发送的主题消息总数有多少,以及消费者当时消费或承诺了多少消息?卡夫卡消费者/生产者API

我initiatting卡夫卡连接器AS-

Map<String, String> kafkaParams = new HashMap<>(); 
kafkaParams.put("metadata.broker.list", "localhost:9092"); 
Set<String> topics = Collections.singleton("mytopic"); 

JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(ssc, 
    String.class, String.class, StringDecoder.class, StringDecoder.class, 
    kafkaParams, topics); 

加工为 -

directKafkaStream.foreachRDD(rdd -> { 
System.out.println("--- New RDD with " + rdd.partitions().size() 
     + " partitions and " + rdd.count() + " records"); 
rdd.foreach(record -> System.out.println(record._2)); 
}); 

FOR 2秒钟

--- New RDD with 2 partitions and 3 records 
value-1 
value-0 
value-2 
--- New RDD with 2 partitions and 7 records 
value-3 
value-5 
value-7 
value-9 
value-4 
value-6 
value-8 
--- New RDD with 2 partitions and 8 records 
value-11 
value-10 
value-13 
... 

回答

2

你可以试试下面执行命令:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1 

然后,总结每个分区的所有计数。