我试图在4核心机器上执行以下代码。我在池中有5个线程,在map运算符中,我将执行线程休眠几秒钟。当当前正在执行的线程进入等待状态时,runOn()方法为什么没有在池中的下一个可用线程上执行映射操作符?
我想到的是,核心将放在执行的线程睡眠,当一个事件是可用的,应该从线程池的下一个可用线程中执行的地图操作,而不是我看到的行为。
我看到池中的4个线程继续等待13秒,并且只有等待完成后才能处理下一个事件。
为什么runOn()
方法在线程进入等待状态时不对池中的下一个可用线程执行映射操作符?
我使用的反应堆核版“3.0.7.RELEASE”
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(5);
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
flux.parallel()
.runOn(Schedulers.fromExecutorService(executorService))
.map(l -> {
Logger.log(ReactorParallelTest.class, "map1", "inside run waiting for 13 seconds");
try {
Thread.sleep(13000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Logger.log(ReactorParallelTest.class, "map1", "l=" + l);
latch.countDown();
return l;
}).subscribe(l -> {
Logger.log(ReactorParallelTest.class, "onNext", "l=" + l);
}, error -> System.err.println(error),
() -> {
Logger.log(ReactorParallelTest.class, "onComplete", "inside complete.");
executorService.shutdown();
});
try {
latch.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
感谢您的回复。 我看到flatMap + subscribeOn在线程之间切换(并且处理来自发布者的新传入事件),即使执行线程进入休眠状态。 你觉得,如果我们进行任何阻塞操作(如调用一个服务/数据库),我们使用执行subscribeOn()它flatMap而不是使用地图和里面的RunOn? –