我喜欢你的解决方案(尽管我怀疑你可能永远不会看到第一个发射值,如果你有一个热流 - 如果源是冷的,一切似乎都很好)。你可以让一个jsfiddle
检查出来吗?如果你不想错过任何价值,你的解决方案是最好的。如果这样做,可以通过将跳过的值添加回源来纠正它(obs1.take(1).map(val => obs1.startWith(val))
。
否则,对于通用冗长的解决方案,此处的关键是您有状态,因此您还需要scan
我们用一个索引标记源,并且我们保持一个状态,它代表已经开始的源的索引,当所有的索引都已经开始时,我们知道那个未索引的索引,并且我们只选择这些值从一个。请注意,这应该独立工作是否所有在一个通所做的来源是热或冷,I,E,不存在多个订阅。
Rx.Observable.merge(
obs1.map(val => {val, sourceId: 1})
obs2.map(val => {val, sourceId: 2})
obsn.map(val => {val, sourceId: n})
).scan(
(acc, valueStruct) => {
acc.valueStruct = valueStruct
acc.alreadyEmitted[valueStruct.sourceId - 1] = true
if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
}
return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
)
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null)
.filter(Boolean)
也许有更短的,我不知道。我会尽量把它放在一个小提琴中,看看它是否真的有效,或者如果你在告诉我之前做了。
没有小提琴,因为我只是亚庆这个好奇心,但是你的答案似乎很有趣。我明天会试试。感谢您花时间回答。 – n00dl3
我想我会坚持我的解决方案,因为它不太详细,所以您建议对热流进行修改。我试图从你的解决方案中做出一个片段,但有语法错误,我不想深入其中。无论如何,谢谢你的时间。 – n00dl3