2016-06-21 97 views
2

我正在使用kafka节点来使用来自特定Kafka主题的消息。当我重新启动我的节点服务器时,它会按预期启动我的使用者,但默认行为是从偏移量0开始消费,而我的目标是仅接收新消息(即从当前偏移量开始消耗)。我没有找到从API文档中实现这一点的方法。任何人都知道它的支持?kafka-node开始从上一次偏移消耗

谢谢!

回答

4

我问卡夫卡节点github上的问题这个问题(link )并得到了答案。它现在可用(从v0.4.0开始)。以下片段适用于我:

consumerClient = new kafka.Client('localhost:2181'); 

/* Print latest offset. */ 
var offset = new kafka.Offset(consumerClient); 

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) { 
     var latestOffset = data['myTopic']['0'][0]; 
     console.log("Consumer current offset: " + latestOffset); 
}); 

var consumer = new kafka.HighLevelConsumer(
     consumerClient, 
     [ 
      { topic: 'myTopic', partition: 0, fromOffset: -1 } 
     ], 
     { 
      autoCommit: false 
     } 
); 

干杯!

-1

如果你想只接收新邮件,您可以创建消费者实例之前设置以下属性: auto.offset.reset =最新

+0

我应该怎么做? – ItayB

+0

/*在创建KafkaConsumer实例之前,您必须设置属性。 */ props.setProperty(“auto.offset.reset”,“latest”);/*最早,最新*/ KafkaConsumer consumer = new KafkaConsumer <>(props); – Hussain

+0

你确定你在谈论JavaScript(node js)API吗?看起来像C++ – ItayB