2017-10-04 192 views
0

我有简单的流量Flux.generate与消费者和并行

Flux<Long> flux = Flux.generate(
      AtomicLong::new, 
      (state, sink) -> { 
       long i = state.getAndIncrement(); 
       sink.next(i); 
       if (i == 3) sink.complete(); 
       return state; 
      }, (state) -> System.out.println("state: " + state)); 

如预期在单个线程其中一期工程:

flux.subscribe(System.out::println); 

输出是

0 1 2 3 state: 4 

但是,当我切换到并联:

flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println); 

消费者应该打印状态:号码不被调用。我只看到:

0 3 2 1 

它是一个错误或预期的功能?

回答

1

我不是一个被动的专家,但在挖掘了源代码之后,似乎这种行为是通过设计的;似乎创建一个ParallelFlux具有阻止州消费者呼叫的副作用;如果你想要平行并得到国家消费者援引,你可以使用:

flux.publishOn(Schedulers.elastic()).subscribe(System.out::println); 
+0

明白了。但是,当你创建一个Flux并期望得到一些结果但在代码中的某个地方使用它会改变这种行为时,这种奇怪的行为。 –