2017-04-03 67 views
0

我有3个流(A,B,C),它们被传送到另一个(A-> B-> C)。 当我启动我的程序B的_read get立即被调用,因为它是通过管道传输给C的。但是,B流中没有数据,而A作为数据异步获取。一旦B接收到传递给B的_write方法的数据,它就会转换数据并发出一个'可读的'事件(我手动启动 - 这是这样做的方式吗?)。节点流:等待,直到数据可用

然而没有任何反应,B中的数据没有被任何人使用(因此B的_read未被调用)。我可以通过在_write()方法的末尾调用(在B上)this._read()来解决此问题。但是这也可能会将数据推送给消费者,尽管队列已满,对吗?

基本上我想发送一个更大的数据块到B流中,将它分成更小的数据,然后将这些数据一个接一个地传递给C。所以我想有某种缓冲区的B.

_read(size) { 
    if(this._lineBuffer.length > 0) { 
     var stop = false; 
     while(!stop) { 
      stop = this.push(this._lineBuffer.splice(0,1)); 
     } 
    } 
    if(this._pendingWriteAck) { 
     this._pendingWriteAck(); 
     this._pendingWriteAck = null; 
    } 
} 

_write(chunk, encoding, callback) { 
    console.log("New chunk for line splitter received"); 
    if(this._buffer) { 
     this._buffer = Buffer.concat([this._buffer, chunk]); 
    } else { 
     this._buffer = chunk; 
    } 
    for (; this._offset < this._buffer.length; this._offset++) { 
     if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line 
      this._lineBuffer.push(this._buffer.slice(0, this._offset).toString()); 
      this._buffer = this._buffer.slice(this._offset + 1); 
      this._offset = 0; 
     } 
    } 

    if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) { 
     console.log("Line Split buffer has reached capacity. Waiting..."); 
     this._pendingWriteAck = callback; 
    } else { 
     callback(); 
    } 

    setImmediate(()=>{ 
     this.emit('readable'); 
     this._read(); 
    }) 
} 

回答

0

您可以使用的“B”流的变换流:

const Transform = require('stream').Transform; 

const B = new Transform({ 
    transform(chunk, encoding, callback) { 
    this._offset = this._offset || 0; 
    this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk 
    for (; this._offset < this._buffer.length; this._offset++) { 
     if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line 
     if (this._offset) { 
      this.push(this._buffer.slice(0, this._offset), encoding); 
     } 
     this._buffer = this._buffer.slice(this._offset + 1); 
     this._offset = 0; 
     } 
    } 
    callback() 
    }, 
    flush(callback) { 
    if (this._buffer && this._buffer.length) { 
     this.push(this._buffer); 
    } 
    callback(); 
    } 
}); 

你可以看到它通过做类似工作:

let line = 0 
B.on('data', (chunk) => process.stdout.write(`${++line}. ${chunk}\n`)) 
B.write(['Foo', 'Bar', 'Baz', 'Qux', 'Hello '].join('\n')) 
B.write(['World', 'The End'].join('\n')) 
B.end() 

输出到终端将是:

1. Foo 
2. Bar 
3. Baz 
4. Qux 
5. Hello World 
6. The End 
+0

但与可读流不同,如果'C'或B和C之间的队列实际上已满,对不对?在我的情况下,制片人A的速度快于'C',我想用B也对A的背压。 – newBee