2017-08-17 41 views
0

RxJava和Reactive Programming的新手可以这么说。并行Rx Java地图函数

我试图将两个函数并行映射为单个Observable管道的一部分,但似乎不能这样工作。这是我的代码。

Observable.fromCallable(thatReturnsNumberOne()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .observeOn(newThread()) 
       .map(doubleIt()) 
       .subscribe(testSubscriber); 

我想2 doubleIt()调用在同一时间产生。但总是看起来,一旦第一个doubleIt()完成,只有第二个开始。即阻塞/顺序。

我错过了什么?

回答

1

我假设thatReturnsNumberOne()只返回一个值。返回的值按顺序传递给每个运算符。通过使用observeOn(newThread()),当值达到链中的那个点时,您只能更改为新线程。

如果你想要做的计算并行,你必须使用多个观测:

Observable.fromCallable(thatReturnsNumberOne()) 
    .flatMap(number -> Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()) 
     .combineLatest(Observable.fromCallable(doubleIt(number)).subscribeOn(newThread()), 
     doubles -> doubles[0] + doubles[1])) 
    .subscribe(testSubscriber);