2017-06-19 88 views
0

我相信新的RxJava并试图从链路执行用于多个观测量并行执行的例子: RxJava Fetching Observables In Parallel为什么并行执行不会发生多个RXJava Observables?

尽管在上面的链接提供的示例中被并行执行观测,但是当我加一根线。睡眠(TIME_IN_MILLISECONDS)在forEach方法中,那么系统一次开始执行一个Observable。请帮我理解为什么Thread.sleep会停止并行执行Observables。

下面是变形例这是造成多个观测的同步执行:

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecution { 

    public static void main(String[] args) { 
     System.out.println("------------ mergingAsync"); 
     mergingAsync(); 
    } 

    private static void mergingAsync() { 
     Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking() 
     .forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){}; 
     System.out.println(x + " " + Thread.currentThread().getId());}); 
    } 

    // artificial representations of IO work 
    static Observable<Integer> getDataAsync(int i) { 
     return getDataSync(i).subscribeOn(Schedulers.io()); 
    } 

    static Observable<Integer> getDataSync(int i) { 
     return Observable.create((Subscriber<? super Integer> s) -> { 
      // simulate latency 
      try { 
       Thread.sleep(1000); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      s.onNext(i); 
      s.onCompleted(); 
     }); 
    } 
} 

在上述例子中我们使用的可观察的subscribeOn方法以及提供线程池(Schedules.io)用于执行,所以每个Observable的订阅将在单独的线程上发生。

Thread.sleep有可能锁定线程之间的任何共享对象,但我仍然不清楚它。请帮忙。

+0

你怎么知道源并不平行运行? – akarnokd

+0

我知道这是通过打印当前正在执行的线程ID。 –

回答

2

实际上,在您的示例中,并行执行是发生的,您只是错误地查看它,执行工作的位置和发出通知的位置之间存在差异。

如果您将线程ID为Observable.create的日志放在日志中,您会注意到每个Observable都是在不同的线程中同时执行的。但通知是连续发生的。这种行为与Observable契约的一部分预期的一样,即观察者必须连续地(不平行地)向观察者发出通知。

相关问题