因此,我创建了一个读取流,首先连接到SFTP并开始从文件读取。在任何时候,我的代码都可以重新设置readstream并执行其他操作。例如,我可能会使用它来获取CSV的前几行并停止阅读。节点流 - 在可读流中收听unpipe
问题是,我不知道如何在我的readStream构造函数中侦听unpipe
事件,以便我可以正确关闭SFTP连接。我在写入流中使用flush
方法,是否有类似于读取流的方法?
这里是我的readStream构造的简化部分:
const Client = require('ssh2').Client,
nom = require('noms');
function getStream (get) {
const self = this;
const conn = new Client();
let client,
fileData,
buffer,
totalBytes = 0,
bytesRead = 0;
let read = function(size,next) {
const read = this;
// Read each chunk of the file
client.read(fileData, buffer, bytesRead, size, bytesRead,
function (err, byteCount, buff, pos) {
bytesRead += byteCount;
read.push(buff);
next();
}
);
};
let before = function(start) {
// setup the connection BEFORE we start _read
conn.on('ready', function(){
conn.sftp(function(err,sftp) {
sftp.open(get, 'r', function(err, fd){
sftp.fstat(fd, function(err, stats) {
client = sftp;
fileData = fd;
totalBytes = stats.size;
buffer = new Buffer(totalBytes);
start();
});
});
});
}).connect(credentials);
};
return nom(read,before);
}
后来我可以称之为myStream.pipe(writeStream)
然后myStream.unpipe()
。但是因为我没有办法监听该事件,读取停止,但SFTP连接保持打开并最终超时。
任何想法?