2016-03-08 143 views
0

我不能让下面的代码工作:对消费者的node-kafka pause()方法。任何工作版本?

"use strict"; 

let kafka = require('kafka-node'); 
var conf = require('./providers/Config'); 

let client = new kafka.Client(conf.kafka.connectionString, conf.kafka.clientName); 
let consumer = new kafka.HighLevelConsumer(client, [ { topic: conf.kafka.readTopic } ], { groupId: conf.kafka.clientName, paused: true }); 

let threads = 0; 

consumer.on('message', function(message) { 
    threads++; 

    if (threads > 10) consumer.pause(); 

    if (threads > 50) process.exit(1); 

    console.log(threads + " >>> " + message.value); 
}); 

consumer.resume(); 

我被终止语句看到控制台和进程退出50个消息。

我想了解的是,这是我的代码破碎或包坏了?或者,也许我只是做错了什么?有没有人能够通过暂停/恢复让卡夫卡消费者工作?我尝试了几个版本的kafka-node,但是它们都以相同的方式运行。谢谢!

+0

当前我使用的是kafka_2.11-0.8.2.1和node-kafka 0.3.2 –

回答

1

你已经在你的代码中使用了pauseresume,所以显然他们工作。 ;)

这是因为pause不会暂停消息的消耗。它暂停提取消息。我猜你在收到第一封邮件之前已经拿到了第一个50码,并致电pause

踢,我只是测试pause(),并在节点REPL resume()和他们的工作预期:

var kafka = require('kafka-node'); 
var client = new kafka.Client('localhost:2181'); 
var consumer = new kafka.HighLevelConsumer(client, [{topic: 'MyTest'}]); 
consumer.on('message', (msg) => { console.log(JSON.stringify(msg)) }); 

然后我去到另一个窗口,然后运行:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyTest 

并键入一些东西,并显示在第一个窗口中。然后在第一个窗口中输入:consumer.pause();并在第二个窗口中输入更多内容。第一个窗口中没有显示。然后我在第一个窗口中运行consumer.resume(),并显示延迟的消息。

顺便说一句,你应该可以玩卡夫卡配置属性fetch.message.max.bytes并控制一次可以提取多少条消息。例如,如果您的固定宽度消息为500字节,请将fetch.message.max.bytes设置为小于1000(但大于500!)的值,以便每次只读取一条消息。但请注意,这可能无法完全解决问题 - 我对Node相当陌生,但它是异步的,并且我怀疑在完全(或根本)处理第一次提取之前,可能会启动第二次提取。

+0

您是绝对正确的。我玩fetchMaxBytes,发现相对于这个值我获取40-100消息。然后它暂停。所以技术上它是有效的。工程不好,但工程。谢谢! –