有由运营商这样的一个简单的解决办法:AMB
只要看看的System.out的输出。
文档:http://reactivex.io/documentation/operators/amb.html
基本上你订阅观察到两者在同一时间和任何可观察到的第一发射获得通过。其他观察对象将取消订阅。
@Test
public void ambTest() throws Exception {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(1, 2, 3))
.doOnSubscribe(disposable -> System.out.println("connect network"))
.doOnDispose(() -> System.out.println("dispose network"));
Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(4, 5, 6))
.doOnSubscribe(disposable -> System.out.println("connect local"))
.doOnDispose(() -> System.out.println("dispose local"));
Observable<Integer> integerObservable = Observable.ambArray(network, local);
TestObserver<Integer> test = integerObservable.test();
testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
}
它的工作。感谢你的回答。 – xymelon
如果'networkObservable'发出错误,我想继续使用'localObservable',我该怎么做? – xymelon
你可以使用mergeDelayError(在这种情况下,你会在localObservable发射一些东西之后得到onError),或者只是用onErrorREsumeNext或类似的方法捕获networkObservable的所有错误 – yosriz