2017-07-25 27 views
1

我试图在4核心机器上执行以下代码。我在池中有5个线程,在map运算符中,我将执行线程休眠几秒钟。当当前正在执行的线程进入等待状态时,runOn()方法为什么没有在池中的下一个可用线程上执行映射操作符?

我想到的是,核心将放在执行的线程睡眠,当一个事件是可用的,应该从线程池的下一个可用线程中执行的地图操作,而不是我看到的行为。

我看到池中的4个线程继续等待13秒,并且只有等待完成后才能处理下一个事件。

为什么runOn()方法在线程进入等待状态时不对池中的下一个可用线程执行映射操作符?

我使用的反应堆核版“3.0.7.RELEASE”

CountDownLatch latch = new CountDownLatch(10); 

ExecutorService executorService = Executors.newFixedThreadPool(5); 

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
flux.parallel() 
.runOn(Schedulers.fromExecutorService(executorService)) 
.map(l -> { 
    Logger.log(ReactorParallelTest.class, "map1", "inside run waiting for 13 seconds"); 
    try { 
     Thread.sleep(13000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Logger.log(ReactorParallelTest.class, "map1", "l=" + l); 
    latch.countDown(); 
    return l; 
}).subscribe(l -> { 
    Logger.log(ReactorParallelTest.class, "onNext", "l=" + l); 
}, error -> System.err.println(error), 
     () -> { 
      Logger.log(ReactorParallelTest.class, "onComplete", "inside complete."); 
      executorService.shutdown(); 
     }); 

try { 
    latch.await(60, TimeUnit.SECONDS); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

回答

1

您是阻止这种代码的所有轨道。将启动4个导轨(CPU数量),并且它们将立即从源请求1个元素。由于您完成此操作后立即阻止map,导轨无法从上游请求更多,因此实际上一次只能获得4个元素,阻止,获取更多,阻止...并行性比容量更有限线程池。如果你想把所有的线程都使用得很好,请执行.parallel(5)(与线程池相同的配置)。

请注意,从ParallelFluxsubscribe(lambda)将为每个导轨调用onComplete回调。如果要合并回单个序列(和单个完整序列),请在.subscribe之前使用.sequential()

+0

感谢您的回复。 我看到flatMap + subscribeOn在线程之间切换(并且处理来自发布者的新传入事件),即使执行线程进入休眠状态。 你觉得,如果我们进行任何阻塞操作(如调用一个服务/数据库),我们使用执行subscribeOn()它flatMap而不是使用地图和里面的RunOn? –

相关问题