2017-04-18 79 views
0

我有一些我并行执行的observable集合,如localObservablenetworkObservable。如果networkObservable开始发射物品(从此时起,我只需要这些物品),然后丢弃由localObservable发出的物品(也许localObservable尚未开始)。rxjava开关可观察,如果第二个可观察的开始发射项目

Observable<Integer> localObservable = 
      Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io()); 
Observable<Integer> networkObservable = 
      Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io()); 

回答

2

你可以做这样的事情:

Observable<Long> networkObservable = 
      Observable.interval(1000, 500, TimeUnit.MILLISECONDS) 
        .subscribeOn(Schedulers.io()) 
        .share(); 
    Observable<Long> localObservable = 
      Observable.interval(500, TimeUnit.MILLISECONDS)      
        .subscribeOn(Schedulers.io()) 
        .takeUntil(networkObservable); 

    Observable.merge(networkObservable, localObservable) 
      .subscribe(System.out::println); 

这将输出:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on 
1 
2 
... 

takeUntil会让localObservable停止和取消时,从networkObservable第一发射发生,所以合并Observable将从localObservable发出只要networkObservable未启动ed,当它发生时,它将停止从localObservable发射并切换为仅从networkObservable发射。

+0

它的工作。感谢你的回答。 – xymelon

+0

如果'networkObservable'发出错误,我想继续使用'localObservable',我该怎么做? – xymelon

+0

你可以使用mergeDelayError(在这种情况下,你会在localObservable发射一些东西之后得到onError),或者只是用onErrorREsumeNext或类似的方法捕获networkObservable的所有错误 – yosriz

0

有由运营商这样的一个简单的解决办法:AMB

只要看看的System.out的输出。

文档:http://reactivex.io/documentation/operators/amb.html

基本上你订阅观察到两者在同一时间和任何可观察到的第一发射获得通过。其他观察对象将取消订阅。

@Test 
public void ambTest() throws Exception { 
    TestScheduler testScheduler = new TestScheduler(); 

    Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler) 
       .concatMap(aLong -> Observable.just(1, 2, 3)) 
       .doOnSubscribe(disposable -> System.out.println("connect network")) 
       .doOnDispose(() -> System.out.println("dispose network")); 

    Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler) 
       .concatMap(aLong -> Observable.just(4, 5, 6)) 
       .doOnSubscribe(disposable -> System.out.println("connect local")) 
       .doOnDispose(() -> System.out.println("dispose local")); 

    Observable<Integer> integerObservable = Observable.ambArray(network, local); 

    TestObserver<Integer> test = integerObservable.test(); 

    testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS); 

    test.assertValues(4, 5, 6); 

    testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 

    test.assertValues(4, 5, 6); 
} 
+0

这取决于需求,如果你想要从当网络可观测数据尚未发射时,局域可观测量,则局域网将只选择具有第一个发射的本地可观测数据,并且永远不会切换到网络可观测数据 – yosriz