我有一个连接到3个分区的主题的卡夫卡消费者。只要我从kafka获得记录,我想要捕获偏移量和分区。在重新启动时,我想从最后一次读取的偏移卡夫卡从相同的偏移量重新启动
卡夫卡文档恢复消费者的立场:
的每条记录都有自己的偏移量,所以要管理自己的偏移,你只需要做到以下几点:
配置enable.auto.commit =假
使用偏移提供每个ConsumerRecord保存您 位置。
在重新启动时,使用seek (TopicPartition,long)恢复消费者的位置。
这里是我的示例代码:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
这是做正确的方法是什么?有没有更好的办法?
感谢您的指点自动分区处理了,是的,我手动操作的分区,如指出,以前我只有1个连接到所有3个分区的消费者,所以我应该没问题 –