我不能让下面的代码工作:对消费者的node-kafka pause()方法。任何工作版本?
"use strict";
let kafka = require('kafka-node');
var conf = require('./providers/Config');
let client = new kafka.Client(conf.kafka.connectionString, conf.kafka.clientName);
let consumer = new kafka.HighLevelConsumer(client, [ { topic: conf.kafka.readTopic } ], { groupId: conf.kafka.clientName, paused: true });
let threads = 0;
consumer.on('message', function(message) {
threads++;
if (threads > 10) consumer.pause();
if (threads > 50) process.exit(1);
console.log(threads + " >>> " + message.value);
});
consumer.resume();
我被终止语句看到控制台和进程退出50个消息。
我想了解的是,这是我的代码破碎或包坏了?或者,也许我只是做错了什么?有没有人能够通过暂停/恢复让卡夫卡消费者工作?我尝试了几个版本的kafka-node,但是它们都以相同的方式运行。谢谢!
当前我使用的是kafka_2.11-0.8.2.1和node-kafka 0.3.2 –