我相信新的RxJava并试图从链路执行用于多个观测量并行执行的例子: RxJava Fetching Observables In Parallel为什么并行执行不会发生多个RXJava Observables?
尽管在上面的链接提供的示例中被并行执行观测,但是当我加一根线。睡眠(TIME_IN_MILLISECONDS)在forEach方法中,那么系统一次开始执行一个Observable。请帮我理解为什么Thread.sleep会停止并行执行Observables。
下面是变形例这是造成多个观测的同步执行:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
在上述例子中我们使用的可观察的subscribeOn方法以及提供线程池(Schedules.io)用于执行,所以每个Observable的订阅将在单独的线程上发生。
Thread.sleep有可能锁定线程之间的任何共享对象,但我仍然不清楚它。请帮忙。
你怎么知道源并不平行运行? – akarnokd
我知道这是通过打印当前正在执行的线程ID。 –