2016-05-13 93 views
0

我有一个进程在一个文件夹中生成数据文件,每10秒钟产生一个新文件。nodeJS/asyncJS并行处理动态队列

我还有一个观察者的NodeJS,监控的目录,为新文件中来了。

 const watcher = chokidar.watch(['data_folder']); 

     watcher.on('add', (path, stats)=>{ 
      if (stats && stats.size > 0){ 
       console.log(path); 
       //spawn child_process to do further processing 
       spawn_child_process_to_run(path); 
      } 
     }); 

新文件将被进一步用child_process处理,这可能需要相当长的时间才能完成。

问题是如何对文件进行排队,以便它们可以并行处理,而不会触及nodeJS子进程的数量限制。

回答

0

随着async.queue

var async = require('async');  
var exec = require('child_process').exec; 


var q = async.queue(function (path, callback) { 
    console.log('hello ' + path); 
    exec('ping 127.0.0.1 -n 6 >nul ', (err, stdout, stderr)=>{console.log(stdout);callback()}); //simulate 6sec processing time 
    }, 4); 


    // assign a callback 
    q.drain = function() { 
    console.log('all items have been processed'); 
} 



q.push([1,2,3,4,5,6,7,8],function(){console.log("done");}) 
1

帮助下,你可以使用async库。

async.cargo将是有用的,详细信息here & here

创建具有指定有效载荷的货物物体。添加到货物的任务将被完全处理(达到有效载荷限制)。如果工作人员正在进行中,则该任务将排队等待,直至其可用。一旦工作人员完成了一些任务,就会调用这些任务的每个回调。查看这些动画,了解货物和队列的工作方式。

虽然队列一次只将一个任务传递给一组工作人员,但是货物会将一系列任务传递给单个工人,并在工人完成时重复。

var chokidar = require('chokidar'); 
var async = require('async') 

var cargo = async.cargo(function (tasks, callback) { 
    async.map(tasks,function(task,cb){ 
     console.log('spawn_child_process_to_run(path);',task); 
     cb(); 
    },callback); 
}, 2);// Number of tasks in parallel 

const watcher = chokidar.watch(['data_folder']); 

watcher.on('add', (path, stats)=>{ 
    if (stats && stats.size > 0){ 
     cargo.push(path);//Push payload 
    } 
});