2016-07-27 85 views
0

最后一个事件之后发生的事情有两个流,这永远不会完成一个流事件来自第二流。如何获得从其他流

在开始的时候,它看起来是这样的:

--a-> 
(?) 
----> 
(=) 
--a-> 

第二流发出它看起来像这样第一个事件之后:

--a---b-c-d-> 
    (?) 
-----1------> 
    (=) 
------b-c-d-> 

第二流中一个新的事件之后:

--a---b-c-d---e--f-> 
     (?) 
-----1-------2-----> 
     (=) 
--------------e--f-> 

等等...哪一组操作员需要这样做?

+1

这是一个很好的问题!我将看看这个[RxJS 5运营商的示例:combineLatest](https://gist.github.com/btroncone/d6cf141d6f2c00dc6b35#combinelatest),看看我能不能拿出一些东西。 – wolfhoundjesse

回答

2

您可以使用CombineLatest返回的事件,你想,例如:

/* Have staggering intervals */ 
var source1 = Rx.Observable.interval(1000) 
    .map(function(i) { 

     return { 
      data: (i + 1), 

      time: new Date() 
     }; 
    }); 

var source2 = Rx.Observable.interval(3000).startWith(-1) 
    .map(function(i) { 

     return { 
      data: (i + 1), 
      time: new Date() 
     }; 
    }); 

// Combine latest of source1 and source2 whenever either gives a value with a selector 
var source = Rx.Observable.combineLatest(
    source1, 
    source2, 
    function(s1, s2) { 

     if (s1.time > s2.time) { 
      return s1.data + ', ' + s2.data; 
     } 

     return undefined; 
    } 
).filter(x => x !== undefined).take(10); 

var subscription = source.subscribe(
    function(x) { 
     console.log('Next: %s', x); 
    }, 
    function(err) { 
     console.log('Error: %s', err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 
0

我不是100%的大理石,因为他们似乎有点矛盾,但我会提出一个替代方案是使用windowbuffer如果您只需将第二个来源的事件之后发生的事件组合在一起。

var count = 0; 
//Converts the source into an Observable of Observable windows 
source1.window(source2).subscribe(w => { 
    var local = count++; 
    //Just to show that this is indeed an Observable 
    //Subscribe to the inner Observable 
    w.subscribe(x => console.log(local + ': ' + x)); 
}); 

注意,如果你希望他们与其他来源的价值观相结合然后通过各种手段使用combineLatestwithLatestFrom由其他答案的建议这只能从第一源发射的值。