我正在使用kafka节点来使用来自特定Kafka主题的消息。当我重新启动我的节点服务器时,它会按预期启动我的使用者,但默认行为是从偏移量0开始消费,而我的目标是仅接收新消息(即从当前偏移量开始消耗)。我没有找到从API文档中实现这一点的方法。任何人都知道它的支持?kafka-node开始从上一次偏移消耗
谢谢!
我正在使用kafka节点来使用来自特定Kafka主题的消息。当我重新启动我的节点服务器时,它会按预期启动我的使用者,但默认行为是从偏移量0开始消费,而我的目标是仅接收新消息(即从当前偏移量开始消耗)。我没有找到从API文档中实现这一点的方法。任何人都知道它的支持?kafka-node开始从上一次偏移消耗
谢谢!
我问卡夫卡节点github上的问题这个问题(link )并得到了答案。它现在可用(从v0.4.0开始)。以下片段适用于我:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
干杯!
如果你想只接收新邮件,您可以创建消费者实例之前设置以下属性: auto.offset.reset =最新
我应该怎么做? – ItayB
/*在创建KafkaConsumer实例之前,您必须设置属性。 */ props.setProperty(“auto.offset.reset”,“latest”);/*最早,最新*/ KafkaConsumer , ?> consumer = new KafkaConsumer <>(props); – Hussain
你确定你在谈论JavaScript(node js)API吗?看起来像C++ – ItayB