我正在开发一个侦听消息队列(ActiveMQ)的node.js组件,并将收到的消息分批添加到redis(每批必须为20个) 。Node.js应用程序侦听消息队列并将消息异步添加到redis
从ActiveMQ接收的消息数量为每秒10个或更少时没有问题。
我的问题是消息每4毫秒添加到队列中。这会导致添加到批次的记录数有时会超过每批20个。
const stompit = require('stompit');
var uuid = require('node-uuid');
var Redis = require('ioredis');
var redis = new Redis();
var pipeline = redis.pipeline();
var batchCounter = 0;
stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) {
client.subscribe({ destination: 'MyQueue' }, function(err2, msg) {
msg.readString('UTF-8', function(err3, body) {
if (batchCounter >= 20){
pipeline.exec(function(err4, results) {
pipeline = redis.pipeline();
batchCounter = 0;
console.log(results);
});
}
batchCounter++;
pipeline.set(uuid.v1(), JSON.stringify(body));
//client.disconnect();
});
});
});
如何解决这个问题? 谢谢
它仍然是可能的,你可以用多条管道会在同一时间Redis的结束。如果这也会导致问题,您可能需要一个fullpipelines队列来处理。 – Matt