2流:连接两个(或n)流
鉴于可读streams
stream1
和stream2
,什么是地道(简洁)的方式来得到含stream1
和stream2
级联流?我不能做
stream1.pipe(outStream); stream2.pipe(outStream)
,因为然后流内容混杂在一起。Ñ流:
给定一个EventEmitter发射不确定数目的流,例如
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
什么是一个地道的(简洁)的方式来得到与连接在一起所有流的流?
回答
combined-stream包将流连接起来。自述文件中的示例:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
我相信您必须一次追加所有流。如果队列为空,则combinedStream
自动结束。请参阅issue #5。
stream-stream库是一个替代方案,它有一个明确的.end
,但它不太受欢迎,大概没有经过良好测试。它使用节点0.10的streams2 API(请参阅this discussion)。
你也许可以使它更加简洁,但这里有一个工程:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on('end', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit('end');
return;
}
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit('data', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
我们保持状态的跟踪与streams
(流的队列; push
到背部和shift
从前面),isStreaming
和streamsEnded
。当我们得到一个新的流时,我们推动它,当流结束时,我们停止聆听并转移它。当流的流结束时,我们设置streamsEnded
。
在这些事件中,我们检查我们所处的状态。如果我们已经进行了流式处理(管道流),那么我们什么也不做。如果队列为空并且设置了streamsEnded
,则我们发出end
事件。如果队列中有某些东西,我们恢复并听取其中的事件。
*请注意,pause
和resume
是咨询性的,所以一些流可能不正确,并需要缓冲。这个练习留给读者。
做完这一切,我会通过构建一个EventEmitter
,用它创建一个ConcatStream
,并释放两种stream
事件后跟end
事件做n=2
情况。我相信它可以做得更简洁,但我们也可以使用我们已有的东西。
streamee.js是一组基于节点1的流变换器和作曲家。0+流和包括串连方法:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
谢谢,我会检查出来。我假设这是0.10节点? – 2013-06-01 18:27:37
是节点0.10,但是您可以将旧式流包装为0.10+流,如自述文件中所写 – atamborrino 2013-06-02 13:17:53
https://github.com/joepie91/node-combined-stream2是一个下拉Streams2兼容的替代组合的流模块(其如上所述。)它自动换Streams1流。
的联合STREAM2示例代码:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
这可以用香草做的NodeJS
import { PassThrough } from 'stream'
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once('end',() => --waiting === 0 && pass.emit('end'))
}
return pass
}
const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
s.pipe(pt, {end: false})
s.once('end',() => a.every(s => s.ended) && pt.emit('end'))
return pt
}, new PassThrough())
干杯;)
不应该从reduce中返回什么东西吗?这看起来像“joined”将是未定义的。 – 2017-09-29 20:22:17
已修复。谢谢;)@Mark_M – Ivo 2017-10-03 17:57:15
警告:这将导致所有流并行传输到PassThrough流,而不考虑数据的排序,这可能会损坏您的数据。 – 2018-01-16 15:55:25
- 1. 连接两个Java流
- 2. 回流连接两个表
- 3. 连接两个或由XSL
- 4. 如何连接两个或更多gzip文件/流
- 5. 连接两个流功能的C++
- 6. 如何使用gulp连接两个流?
- 7. 蓝牙连接两个设备(流)
- 8. 使用连接连接两个或多个表
- 9. 正则表达式匹配两个两个字(或n乘n)
- 10. 如何在CakePHP中的第三个n..n(hasAndBelongsToMany)关系中连接两个表?
- 11. tensorflow完全连接控制流程每个n-epoch摘要
- 12. 连接两个ObjectDataSources
- 13. 连接两个行
- 14. 连接两个表
- 15. 连接两个dataframes
- 16. 连接两个表
- 17. 连接两个表
- 18. 连接两个表
- 19. 连接两个表
- 20. 连接两个表
- 21. 连接两个类?
- 22. Codeigniter根据两个连接条件连接两个表
- 23. 连接到多个表或在ASP.NET MVC C中连接两个表#
- 24. mySql N到N双连接与空 - CROSS连接从答案
- 25. 连接两个或多个表合并成一个表
- 26. 流星连接
- 27. 连接流
- 28. 将n×n板切成n个连接的n-minos的最快实现
- 29. Mysql的内连接两次,使用WHERE上两个连接
- 30. MYSQL - 连接两个表
感谢阿龙!我有点希望会有一些现有的图书馆,所以我可以用三行来解决它。如果没有,我想我可能会将您的解决方案提取到一个包中。我可以根据MIT许可证使用您的代码吗? – 2013-05-08 18:40:11
啊,找到了流溪流库。看到我的答案。 – 2013-05-08 19:13:28
@JoLiss我也先找了点东西,但是我找不到那个选项。如果你仍然想,你当然可以在库中使用我的代码。 – 2013-05-08 21:13:43