0
我有3个流(A,B,C),它们被传送到另一个(A-> B-> C)。 当我启动我的程序B的_read get立即被调用,因为它是通过管道传输给C的。但是,B流中没有数据,而A作为数据异步获取。一旦B接收到传递给B的_write方法的数据,它就会转换数据并发出一个'可读的'事件(我手动启动 - 这是这样做的方式吗?)。节点流:等待,直到数据可用
然而没有任何反应,B中的数据没有被任何人使用(因此B的_read未被调用)。我可以通过在_write()方法的末尾调用(在B上)this._read()来解决此问题。但是这也可能会将数据推送给消费者,尽管队列已满,对吗?
基本上我想发送一个更大的数据块到B流中,将它分成更小的数据,然后将这些数据一个接一个地传递给C。所以我想有某种缓冲区的B.
_read(size) {
if(this._lineBuffer.length > 0) {
var stop = false;
while(!stop) {
stop = this.push(this._lineBuffer.splice(0,1));
}
}
if(this._pendingWriteAck) {
this._pendingWriteAck();
this._pendingWriteAck = null;
}
}
_write(chunk, encoding, callback) {
console.log("New chunk for line splitter received");
if(this._buffer) {
this._buffer = Buffer.concat([this._buffer, chunk]);
} else {
this._buffer = chunk;
}
for (; this._offset < this._buffer.length; this._offset++) {
if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
this._buffer = this._buffer.slice(this._offset + 1);
this._offset = 0;
}
}
if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
console.log("Line Split buffer has reached capacity. Waiting...");
this._pendingWriteAck = callback;
} else {
callback();
}
setImmediate(()=>{
this.emit('readable');
this._read();
})
}
但与可读流不同,如果'C'或B和C之间的队列实际上已满,对不对?在我的情况下,制片人A的速度快于'C',我想用B也对A的背压。 – newBee