2017-03-31 86 views
0

因此,我创建了一个读取流,首先连接到SFTP并开始从文件读取。在任何时候,我的代码都可以重新设置readstream并执行其他操作。例如,我可能会使用它来获取CSV的前几行并停止阅读。节点流 - 在可读流中收听unpipe

问题是,我不知道如何在我的readStream构造函数中侦听unpipe事件,以便我可以正确关闭SFTP连接。我在写入流中使用flush方法,是否有类似于读取流的方法?

这里是我的readStream构造的简化部分:

const Client = require('ssh2').Client, 
     nom = require('noms'); 

function getStream (get) { 
    const self = this; 
    const conn = new Client(); 

    let client, 
     fileData, 
     buffer, 
     totalBytes = 0, 
     bytesRead = 0; 

    let read = function(size,next) { 
     const read = this; 
     // Read each chunk of the file 
     client.read(fileData, buffer, bytesRead, size, bytesRead, 
      function (err, byteCount, buff, pos) { 
       bytesRead += byteCount; 
       read.push(buff); 
       next(); 
      } 
     ); 
    }; 

    let before = function(start) { 
     // setup the connection BEFORE we start _read 
     conn.on('ready', function(){ 
      conn.sftp(function(err,sftp) { 
       sftp.open(get, 'r', function(err, fd){ 
        sftp.fstat(fd, function(err, stats) { 
         client = sftp; 
         fileData = fd; 
         totalBytes = stats.size; 
         buffer = new Buffer(totalBytes); 

         start(); 
        }); 
       }); 
      }); 
     }).connect(credentials); 
    }; 

    return nom(read,before); 
} 

后来我可以称之为myStream.pipe(writeStream)然后myStream.unpipe()。但是因为我没有办法监听该事件,读取停止,但SFTP连接保持打开并最终超时。

任何想法?

回答

0

因此,在进行了更多研究之后,我了解到当您致电readStream.unpipe(writeStream)时,ReadStreams不会通过unpipe事件。该事件只传递给writeStream。为了监听unpipe,你需要在readStream明确地发出一个事件,像这样:

readStream.emit('unpipe'); 

您可以侦听此事件的任何位置,内部或流构造函数,它真的很方便之外。所以,这将使这样看上面的代码:

故事
function getStream (get) { 
    /** 
    * ... stuff 
    * ... read() 
    * ... before() 
    * ... etc 
    */ 

    let readStream = nom(read,before); 

    readStream.on('unpipe', function(){ 
     console.log('called unpipe on read stream'); 
    }); 

    return readStream; 
} 

道德,流已经有Event Emitter class methods,这样你就可以发出并监听自定义事件开箱。