2015-03-24 69 views
1

我有一串流通过电话线发送,需要一点时间才能完全发送,所以我想要显示它在飞行中有多远。我知道你可以监听数据流的'数据'事件,但是在较新版本的节点中,它也会将流设置为“流动模式”。我想确保我正确地做到了这一点。如何正确计算已处理的node.js流的字节数?

目前,我有以下的东西:

deploymentPackageStream.pause() // to prevent it from entering "flowing mode" 
var bytesSent = 0 
deploymentPackageStream.on('data', function(data) { 
    bytesSent+=data.length 
    process.stdout.write('\r     ') 
    process.stdout.write('\r'+(bytesSent/1000)+'kb sent') 
}) 
deploymentPackageStream.resume() 

// copy over the deployment package 
execute(conn, 'cat > deploymentPackage.sh', deploymentPackageStream).wait()  

这给了我正确的bytesSent输出,但所得到的包装似乎缺少关闭前的一些数据。如果我在执行复制行(最后一行)后放置“简历”行,它不会复制任何内容。如果我不恢复,它也不会复制任何内容。发生了什么事情,如何在不中断流的情况下正确执行此操作,并且无需进入流动模式(我想要背压)?

我应该提,我还在使用v0.10.x

+0

流动模式让你读取数据,如进来,并暂停模式,则需要显式调用'stream.read ()'来获取数据。流动模式是不是你想要的? – gregnr 2015-03-25 05:55:43

+0

@gregnr流动模式意味着没有背压吗?那里的最后一行通过网络发送流,我想为此反压。我真正想要的是最后一行请求数据(通过stream.read()某处),以及* both *复制代码*和*字节发送打印代码以获取数据(通过事件字节发送的东西,并通过stream.read()的最后一行,通过网络发送它)。 – 2015-03-25 18:33:35

回答

0

好吧,我做的东西,本质上是一种直通节点,但它有调用数据的回调:

// creates a stream that can view all the data in a stream and passes the data through 
// parameters: 
    // stream - the stream to peek at 
    // callback - called when there's data sent from the passed stream 
var StreamPeeker = exports.StreamPeeker = function(stream, callback) { 
    Readable.call(this) 
    this.stream = stream 

    stream.on('readable', function() { 
     var data = stream.read() 
     if(data !== null) { 
      if(!this.push(data)) stream.pause() 
      callback(data) 
     } 
    }.bind(this)) 

    stream.on('end', function() { 
     this.push(null) 
    }.bind(this)) 
} 
util.inherits(StreamPeeker, Readable) 
StreamPeeker.prototype._read = function() { 
    this.stream.resume() 
} 

如果我理解正确的流,这应该适当地处理背压。

利用这一点,我就可以计数data.length回调是这样的:

var peeker = new StreamPeeker(stream, function(data) { 
    // use data.length 
}) 
peeker.pipe(destination) 
+0

这似乎没有正确的工作 - 背压似乎没有被应用。它贯穿数据,但另一方面,实际上可能需要几分钟才能看到数据。喘气流 – 2015-03-27 08:03:25