2013-07-04 37 views
17

如果可能,我想通过管道将两个Node.js流合并为一个。我正在使用Transform流。从两个管道流创建一个Node.js流

换句话说,我想让我的图书馆返回myStream供人们使用。例如,它们可以这样写:

process.stdin.pipe(myStream).pipe(process.stdout); 

和国内我使用第三方vendorStream,做了一些工作,插到我这包含在myInternalStream自己的逻辑。那么上面的内容会转化为:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout); 

我可以这样做吗?我试过var myStream = vendorStream.pipe(myInternalStream)但这显然不起作用。

打个比方与bash,比方说,我想编写一个程序来检查,如果信h存在于一些流(tail -n 1 | grep h)的最后一行,我可以创建一个shell脚本:

# myscript.sh 
tail -n 1 | grep h 

然后,如果人做:

$ printf "abc\ndef\nghi" | . myscript.sh 

这只是工作。

这是我到目前为止有:

// Combine a pipe of two streams into one stream 

var util = require('util') 
    , Transform = require('stream').Transform; 

var chunks1 = []; 
var stream1 = new Transform(); 
var soFar = ''; 
stream1._transform = function(chunk, encoding, done) { 
    chunks1.push(chunk.toString()); 
    var pieces = (soFar + chunk).split('\n'); 
    soFar = pieces.pop(); 
    for (var i = 0; i < pieces.length; i++) { 
    var piece = pieces[i]; 
    this.push(piece); 
    } 
    return done(); 
}; 

var chunks2 = []; 
var count = 0; 
var stream2 = new Transform(); 
stream2._transform = function(chunk, encoding, done) { 
    chunks2.push(chunk.toString()); 
    count = count + 1; 
    this.push(count + ' ' + chunk.toString() + '\n'); 
    done(); 
}; 

var stdin = process.stdin; 
var stdout = process.stdout; 

process.on('exit', function() { 
    console.error('chunks1: ' + JSON.stringify(chunks1)); 
    console.error('chunks2: ' + JSON.stringify(chunks2)); 
}); 
process.stdout.on('error', process.exit); 


// stdin.pipe(stream1).pipe(stream2).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

// Best working solution I could find 
var stream3 = function(src) { 
    return src.pipe(stream1).pipe(stream2); 
}; 
stream3(stdin).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

这是在所有可能的?让我知道,如果我想做的事情不明确。

谢谢!

回答

25

你可以看的东西要通过管道输送到你的流,然后unpipe它和管道它流您有兴趣:

var PassThrough = require('stream').PassThrough; 

var stream3 = new PassThrough(); 

// When a source stream is piped to us, undo that pipe, and save 
// off the source stream piped into our internally managed streams. 
stream3.on('pipe', function(source) { 
    source.unpipe(this); 
    this.transformStream = source.pipe(stream1).pipe(stream2); 
}); 

// When we're piped to another stream, instead pipe our internal 
// transform stream to that destination. 
stream3.pipe = function(destination, options) { 
    return this.transformStream.pipe(destination, options); 
}; 

stdin.pipe(stream3).pipe(stdout); 

您可以提取这个功能集成到自己施工的流类:

var util = require('util'); 
var PassThrough = require('stream').PassThrough; 

var StreamCombiner = function() { 
    this.streams = Array.prototype.slice.apply(arguments); 

    this.on('pipe', function(source) { 
    source.unpipe(this); 
    for(i in this.streams) { 
     source = source.pipe(this.streams[i]); 
    } 
    this.transformStream = source; 
    }); 
}; 

util.inherits(StreamCombiner, PassThrough); 

StreamCombiner.prototype.pipe = function(dest, options) { 
    return this.transformStream.pipe(dest, options); 
}; 

var stream3 = new StreamCombiner(stream1, stream2); 
stdin.pipe(stream3).pipe(stdout); 
+0

非常感谢@brandon,这是真棒!更新了我的要点https://gist.github.com/nicolashery/5910969 –

+0

这太棒了。我正在考虑做类似的事情,但我只是没有信心,我并没有错过一些让我的解决方案错误的微妙之处。感谢您的信任 – FellowMD

+0

FWIW,为了使这个解决方案能够工作,您需要将stream3管道传输到标准输出(在这种情况下是stdin)。所以,没有stream3.pipe(stdout); stream3.write(数据);但这是一个很大的帮助!谢谢! –

2

一种选择是可能使用multipipe它可以让你连锁多个变换一起包装成一个单一的变换流:

// my-stream.js 
var multipipe = require('multipipe'); 

module.exports = function createMyStream() { 
    return multipipe(vendorStream, myInternalStream); 
}; 

然后,你可以这样做:

var createMyStream = require('./my-stream'); 

var myStream = createMyStream(); 

process.stdin.pipe(myStream).pipe(process.stdout);