2017-04-24 59 views
2

我有两个观察对象,我想听最后发出其第一个值的那个,是否有一个操作符?类似的东西:是 - RxJS中的“race”操作符有相反的地方吗?

let obs1 = Rx.Observable.timer(500,500); 
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one 
let sloth = Rx.Observable.sloth(obs1,obs2); 

其中sloth观察到的,因为它是谁最后发射它的第一个值的一个从obs2将发出值。

如果情况并非如此,还有其他方法吗?

回答

1

我看这种可能性,现在,但我很好奇,如果有人找别的:

let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`); 
 
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`); 
 
let sloth = Rx.Observable.merge(
 
    obs1.take(1).mapTo(obs1), 
 
    obs2.take(1).mapTo(obs2) 
 
).takeLast(1).mergeAll() 
 

 
sloth.subscribe(data=>console.log(data))
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

编辑由@ user3743222(非常漂亮的绰号指出: - D),它不适用于热观察,但这应该是罚款:

let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish(); 
 
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish(); 
 
obs1.connect(); 
 
obs2.connect(); 
 
let sloth = Rx.Observable.merge(
 
    obs1.take(1).map((val)=>obs1.startWith(val)), 
 
    obs2.take(1).map((val)=>obs2.startWith(val)) 
 
).takeLast(1).mergeAll(); 
 
    
 
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

1

我喜欢你的解决方案(尽管我怀疑你可能永远不会看到第一个发射值,如果你有一个热流 - 如果源是冷的,一切似乎都很好)。你可以让一个jsfiddle检查出来吗?如果你不想错过任何价值,你的解决方案是最好的。如果这样做,可以通过将跳过的值添加回源来纠正它(obs1.take(1).map(val => obs1.startWith(val))

否则,对于通用冗长的解决方案,此处的关键是您有状态,因此您还需要scan我们用一个索引标记源,并且我们保持一个状态,它代表已经开始的源的索引,当所有的索引都已经开始时,我们知道那个未索引的索引,并且我们只选择这些值从一个。请注意,这应该独立工作是否所有在一个通所做的来源是热或冷,I,E,不存在多个订阅。

Rx.Observable.merge(
    obs1.map(val => {val, sourceId: 1}) 
    obs2.map(val => {val, sourceId: 2}) 
    obsn.map(val => {val, sourceId: n}) 
).scan( 
(acc, valueStruct) => { 
    acc.valueStruct = valueStruct 
    acc.alreadyEmitted[valueStruct.sourceId - 1] = true 
    if (acc.alreadyEmitted.filter(Boolean).length === n - 1) { 
    acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false) 
    } 
    return acc 
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null} 
) 
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null) 
.filter(Boolean) 

也许有更短的,我不知道。我会尽量把它放在一个小提琴中,看看它是否真的有效,或者如果你在告诉我之前做了。

+0

没有小提琴,因为我只是亚庆这个好奇心,但是你的答案似乎很有趣。我明天会试试。感谢您花时间回答。 – n00dl3

+0

我想我会坚持我的解决方案,因为它不太详细,所以您建议对热流进行修改。我试图从你的解决方案中做出一个片段,但有语法错误,我不想深入其中。无论如何,谢谢你的时间。 – n00dl3

1

这个怎么样?

let obs1 = Rx.Observable.timer(500,500); 
let obs2 = Rx.Observable.timer(1000,1000); 

let sloth = Rx.Observable.race(
    obs1.take(1).concat(obs2), 
    obs2.take(1).concat(obs1) 
).skip(1); 

此外,与多个观测量的功能支持:

let sloth = (...observables) => 
    observables.length === 1 ? 
     observables[0] : 
    observables.length === 2 ? 
     Rx.Observable.race(
      observables[0].take(1).concat(observables[1]), 
      observables[1].take(1).concat(observables[0]) 
     ).skip(1) : 
    observables.reduce((prev, current) => sloth(prev, current))[0]; 
+0

非常好的解决方案,谢谢! – n00dl3

+0

如果有两个以上可观察的情况,你会怎么做? – n00dl3

+0

当然,很好的问题! 多个观察对象会是这样的: let sloth =(... observables)=> \t observables.length === 1? \t \t observables [0]: \t observables.length === 2? \t \t Rx.Observable.race( \t \t \t观测[0]。取(1).concat(观测值[1]), \t \t \t可观测量[1]。取(1).concat(观测值[0 ]) \t \t).skip(1): \t observables.reduce((结果,电流)=> \t \t懒惰(结果,电流), \t \t Observable.of( '即时项被忽略') \t)[0]; – ZahiC

相关问题