2013-05-08 82 views
13
  • 2流:连接两个(或n)流

    鉴于可读streamsstream1stream2,什么是地道(简洁)的方式来得到stream1stream2级联流?

    我不能做stream1.pipe(outStream); stream2.pipe(outStream),因为然后流内容混杂在一起。

  • Ñ流:

    给定一个EventEmitter发射不确定数目的流,例如

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    什么是一个地道的(简洁)的方式来得到与连接在一起所有流的流?

回答

11

combined-stream包将流连接起来。自述文件中的示例:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

我相信您必须一次追加所有流。如果队列为空,则combinedStream自动结束。请参阅issue #5

stream-stream库是一个替代方案,它有一个明确的.end,但它不太受欢迎,大概没有经过良好测试。它使用节点0.10的streams2 API(请参阅this discussion)。

3

你也许可以使它更加简洁,但这里有一个工程:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

我们保持状态的跟踪与streams(流的队列; push到背部和shift从前面),isStreamingstreamsEnded。当我们得到一个新的流时,我们推动它,当流结束时,我们停止聆听并转移它。当流的流结束时,我们设置streamsEnded

在这些事件中,我们检查我们所处的状态。如果我们已经进行了流式处理(管道流),那么我们什么也不做。如果队列为空并且设置了streamsEnded,则我们发出end事件。如果队列中有某些东西,我们恢复并听取其中的事件。

*请注意,pauseresume是咨询性的,所以一些流可能不正确,并需要缓冲。这个练习留给读者。

做完这一切,我会通过构建一个EventEmitter,用它创建一个ConcatStream,并释放两种stream事件后跟end事件做n=2情况。我相信它可以做得更简洁,但我们也可以使用我们已有的东西。

+0

感谢阿龙!我有点希望会有一些现有的图书馆,所以我可以用三行来解决它。如果没有,我想我可能会将您的解决方案提取到一个包中。我可以根据MIT许可证使用您的代码吗? – 2013-05-08 18:40:11

+0

啊,找到了流溪流库。看到我的答案。 – 2013-05-08 19:13:28

+0

@JoLiss我也先找了点东西,但是我找不到那个选项。如果你仍然想,你当然可以在库中使用我的代码。 – 2013-05-08 21:13:43

1

streamee.js是一组基于节点1的流变换器和作曲家。0+流和包括串连方法:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

谢谢,我会检查出来。我假设这是0.10节点? – 2013-06-01 18:27:37

+0

是节点0.10,但是您可以将旧式流包装为0.10+流,如自述文件中所写 – atamborrino 2013-06-02 13:17:53

2

https://github.com/joepie91/node-combined-stream2是一个下拉Streams2兼容的替代组合的流模块(其如上所述。)它自动换Streams1流。

的联合STREAM2示例代码:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

这可以用香草做的NodeJS

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

简单reduce操作应在nodejs被罚款!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

干杯;)

+0

不应该从reduce中返回什么东西吗?这看起来像“joined”将是未定义的。 – 2017-09-29 20:22:17

+1

已修复。谢谢;)@Mark_M – Ivo 2017-10-03 17:57:15

+0

警告:这将导致所有流并行传输到PassThrough流,而不考虑数据的排序,这可能会损坏您的数据。 – 2018-01-16 15:55:25