2017-06-22 86 views
1

我有一个连接到3个分区的主题的卡夫卡消费者。只要我从kafka获得记录,我想要捕获偏移量和分区。在重新启动时,我想从最后一次读取的偏移卡夫卡从相同的偏移量重新启动

卡夫卡文档恢复消费者的立场:

的每条记录都有自己的偏移量,所以要管理自己的偏移,你只需要做到以下几点:

配置enable.auto.commit =假

使用偏移提供每个ConsumerRecord保存您 位置。

在重新启动时,使用seek (TopicPartition,long)恢复消费者的位置。

这里是我的示例代码:

constructor{  
    load data into offsetMap<partition,offset> 
    initFlag=true; 
} 

Main method 
{ 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    if(initFlag) // is this correct way to override offset position? 
    { 
     seekToPositions(offsetMap); 
     initFlag=false; 
    } 
    while(!shutdown) 
    { 
     for (ConsumerRecord<String, String> record : records) { 
       System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
       getOffsetPositions();// dump offsets and partitions to db/disk 
     } 
    } 
} 

//get current offset and write to a file 
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{ 

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>(); 
    //code to put partition and offset into map 
    //write to disk or db 

    } 
} // Overrides the fetch offsets that the consumer 

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) { 
      //code get partitions and offset from offsetMap 
      consumer.seek(partition, offset); 

    } 

这是做正确的方法是什么?有没有更好的办法?

回答

0

这是确定对我来说,只是要小心你的消费者是如何建立(手动分区分配或自动)

如果分区分配自动完成需要特别小心处理的情况下分区分配变化。这可以通过在对订阅(Collection,ConsumerRebalanceListener)和订阅(Pattern,ConsumerRebalanceListener)的调用中提供ConsumerRebalanceListener实例来完成。例如,当从消费者获取分区时,消费者将通过实施ConsumerRebalanceListener.onPartitionsRevoked(集合)来提交这些分区的偏移量。当分区分配给消费者时,消费者需要查找这些新分区的偏移量,并通过实现ConsumerRebalanceListener.onPartitionsAssigned(Collection)将消费者正确初始化到该位置。

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

+0

感谢您的指点自动分区处理了,是的,我手动操作的分区,如指出,以前我只有1个连接到所有3个分区的消费者,所以我应该没问题 –