2013-04-02 78 views
0

我有一个Node.js的流,我暂时写入到一个数组是这样的:如何并发写入流并读取node.js中的数据?

var tempCrossSection = []; 

stream.on('data', function(data) { 
    tempCrossSection.push(data); 
}); 

然后我定期地采集数据数组中(和清除它),并在其上做一些处理这样的:

var crossSection = []; 

setInterval(function() { 
    crossSection = tempCrossSection; 
    tempCrossSection = []; 

    someOtherFunction(crossSection, function(data) { 
     console.log(data); 
    } 
}, 30000); 

的问题是,我得到一些奇怪的行为与该流被写入到阵列和被解雇作为流率的增加和/或someOtherFunction回调需要的setInterval回调的数量顺序太长。

我应该如何实现此,使得流被正确地将数据写入到所述阵列(按顺序),并正在每setInterval的回调进行一次的数据处理。

+0

我不知道我理解。顺序将始终与来自数据流的数据顺序相同,只是推入数组中。代码如何被调用?错误似乎是在别的地方。 – freakish

+1

有两件事情可以做:在someOtherFunction回调,而不是setInterval的使用的setTimeout和'截面= tempCrossSection.splice(0)'更有意义清除阵列。不知道这些与你的问题有什么关系。 –

+0

@freakish我也有同样的期望,但在某些情况下,我不得不控制台陈述反映的是不同的(不是按顺序大)array.length(S) – TankofVines

回答

1

有你的代码的几个问题。首先你要分享到很多国家。 例如横截面应在匿名Interval函数来限定。 为什么“crossSection”定义为闭包?如果其他功能运行很长一段时间,您可能确实会遇到某种竞争条件。

var source = []; 

stream.on('data', function(data) { 
    source.push(data); 
}); 

setInterval(function() { 
    var target = source; 
    source = []; 

    someOtherFunction(target, function(data) { 
     console.log(data); 
    } 
}, 30000); 

如果你有机会获得someOtherFunction然后我会重写这样

var source = []; 

stream.on('data', function(data) { 
    source.push(data); 
}); 

setInterval(function() { 
    var processing = true; 

    while (processing) { 
     var elem = source.shift(); 
     someOtherFunction(elem, function(data) { 
      console.log(data); 
     }); 
     processing = checkForBreakConditionAndReturnFalseIfBreak(); 
    } 
}, 30000); 

仍然是整个事情,你可能会遇到一些问题,如果元素的数量是大和someOtherFunctions需要很长时间。所以我可能会这样做

var source = []; 
var timerId = 0; 

stream.on('data', function(data) { 
    source.push(data); 
}); 

function processSource() { 
    clearTimeout(timerId); 
    var processing = true; 

    while (processing) { 
     var elem = source.shift(); 
     someOtherFunction(elem, function(data) { 
      console.log(data); 
     }); 
     processing = checkForBreakConditionAndReturnFalseIfBreak(); 
    } 
    setTimeout(processSource, calcTimeoutForNextProcessingDependentOnPastData()); 
}; 

setTimeout(processSource, 30000); //initial Timeout 
+0

感谢提示有一些好东西在那里。我遇到了一些快速启动后续流的问题,这是我的问题的根源。 – TankofVines