2017-02-14 69 views
0

我正在开发一个侦听消息队列(ActiveMQ)的node.js组件,并将收到的消息分批添加到redis(每批必须为20个) 。Node.js应用程序侦听消息队列并将消息异步添加到redis

从ActiveMQ接收的消息数量为每秒10个或更少时没有问题。

我的问题是消息每4毫秒添加到队列中。这会导致添加到批次的记录数有时会超过每批20个。

const stompit = require('stompit'); 
var uuid = require('node-uuid'); 
var Redis = require('ioredis'); 

var redis = new Redis(); 
var pipeline = redis.pipeline(); 
var batchCounter = 0; 

stompit.connect({ host: 'localhost', port: 61613 }, function(err1, client) { 

client.subscribe({ destination: 'MyQueue' }, function(err2, msg) { 
    msg.readString('UTF-8', function(err3, body) { 

     if (batchCounter >= 20){ 
      pipeline.exec(function(err4, results) { 
       pipeline = redis.pipeline(); 
       batchCounter = 0; 
       console.log(results); 
      }); 
     } 

     batchCounter++; 
     pipeline.set(uuid.v1(), JSON.stringify(body)); 
     //client.disconnect(); 
     }); 
    }); 
    }); 

如何解决这个问题? 谢谢

回答

0

尝试在调用.exec方法之前重置管道,我认为这是一种异步方法。由于.exec将来会在某个时间点运行,因此增量和pipeline.set可以在它之前运行。

下面的代码保存当前的管道和前.exec

if (batchCounter >= 20){ 
    let fullpipeline = pipeline; 
    pipeline = redis.pipeline(); 
    batchCounter = 0; 
    fullpipeline.exec(function(err4, results) { 
     console.log(err4, results); 
    }); 
} 

然后新的消息应该那么只有附加到新的管道同步创建一个新文件。

+0

它仍然是可能的,你可以用多条管道会在同一时间Redis的结束。如果这也会导致问题,您可能需要一个fullpipelines队列来处理。 – Matt