0
我有一个来自分叉进程的无限数据流。我希望这个流被一个模块处理,有时我想复制这个流中的数据以便由不同的模块处理(例如,监视数据流,但是如果发生任何有趣的事情,我想记录下n个字节以便为进一步的调查)。NodeJS流拆分
因此,让我们假设以下情形:
- 我启动该程序并开始消耗可读流
- 2秒后,我想用不同的流读取器来处理1秒的相同数据
- 一旦时间到了,我想关闭第二个消费者,但最初的消费者必须保持不动。
下面的代码片段此:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
我在这里的问题是,unpiping的STREAM2没有得到它关闭。如果我的unpipe
命令后调用stream.end()
,然后将其与错误崩溃:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
我甚至试图暂停源流,以帮助从第二气流冲扫的缓冲区,但它没有工作之一:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function() {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
和以前一样的错误(写完后)。
如何解决问题?我最初的意图是从一个点复制流数据,然后在一段时间后关闭第二个数据流。
Note: I tried to use this answer to make it work.