2016-12-26 70 views
0

我有一个来自分叉进程的无限数据流。我希望这个流被一个模块处理,有时我想复制这个流中的数据以便由不同的模块处理(例如,监视数据流,但是如果发生任何有趣的事情,我想记录下n个字节以便为进一步的调查)。NodeJS流拆分

因此,让我们假设以下情形:

  1. 我启动该程序并开始消耗可读流
  2. 2秒后,我想用不同的流读取器来处理1秒的相同数据
  3. 一旦时间到了,我想关闭第二个消费者,但最初的消费者必须保持不动。

下面的代码片段此:

var stream = process.stdout; 

stream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new PassThrough(); 
    stream.pipe(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    stream.unpipe(stream2); 
} 

我在这里的问题是,unpiping的STREAM2没有得到它关闭。如果我的unpipe命令后调用stream.end(),然后将其与错误崩溃:

events.js:160 
     throw er; // Unhandled 'error' event 
    ^

Error: write after end 
    at writeAfterEnd (_stream_writable.js:192:12) 
    at PassThrough.Writable.write (_stream_writable.js:243:5) 
    at Socket.ondata (_stream_readable.js:555:20) 
    at emitOne (events.js:101:20) 
    at Socket.emit (events.js:188:7) 
    at readableAddChunk (_stream_readable.js:176:18) 
    at Socket.Readable.push (_stream_readable.js:134:10) 
    at Pipe.onread (net.js:548:20) 

我甚至试图暂停源流,以帮助从第二气流冲扫的缓冲区,但它没有工作之一:

function stopAnotherConsumer() { 
    stream.pause(); 
    stream2.once('unpipe', function() { 
     stream.resume(); 
     stream2.end(); 
    }); 
    stream.unpipe(stream2); 
} 

和以前一样的错误(写完后)。

如何解决问题?我最初的意图是从一个点复制流数据,然后在一段时间后关闭第二个数据流。

Note: I tried to use this answer to make it work.

回答

0

由于没有答案,我张贴我的(拼凑)解决方案。如果有人有更好的一个,不要阻止它。

一个新流:

const Writable = require('stream').Writable; 
const Transform = require('stream').Transform; 

class DuplicatorStream extends Transform { 
    constructor(options) { 
     super(options); 

     this.otherStream = null; 
    } 

    attachStream(stream) { 
     if (!stream instanceof Writable) { 
      throw new Error('DuplicatorStream argument is not a writeable stream!'); 
     } 

     if (this.otherStream) { 
      throw new Error('A stream is already attached!'); 
     } 

     this.otherStream = stream; 
     this.emit('attach', stream); 
    } 

    detachStream() { 
     if (!this.otherStream) { 
      throw new Error('No stream to detach!'); 
     } 

     let stream = this.otherStream; 
     this.otherStream = null; 
     this.emit('detach', stream); 
    } 

    _transform(chunk, encoding, callback) { 
     if (this.otherStream) { 
      this.otherStream.write(chunk); 
     } 

     callback(null, chunk); 
    } 
} 

module.exports = DuplicatorStream; 

和使用:

var stream = process.stdout; 
var stream2; 

duplicatorStream = new DuplicatorStream(); 
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
duplicatorStream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new stream.PassThrough(); 
    duplicatorStream.attachStream(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    duplicatorStream.once('detach', function() { 
     stream2.end(); 
    }); 
    duplicatorStream.detachStream(); 
}