2017-06-15 117 views
0

不会不工作的方法,我试图整合阻挡消费者在反应堆铝SR1一个通量用户。我想使用并行的调度程序来同时执行阻塞操作。通量的publishOn预期

我已经实现一个主类来描述我的意图:

package etienne.peiniau; 

import org.reactivestreams.Subscriber; 
import org.reactivestreams.Subscription; 
import reactor.core.publisher.Flux; 
import reactor.core.scheduler.Schedulers; 
import reactor.util.function.Tuple2; 

public class Main { 

    public static void main(String[] args) throws InterruptedException { 
     Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 
       .elapsed() 
       .publishOn(Schedulers.parallel()) 
       .subscribe(new Subscriber<Tuple2<Long, Integer>>() { 
        @Override 
        public void onSubscribe(Subscription subscription) { 
         System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
         subscription.request(Long.MAX_VALUE); 
        } 

        @Override 
        public void onNext(Tuple2<Long, Integer> t2) { 
         System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
         try { 
          Thread.sleep(1000); // long operation 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
        } 

        @Override 
        public void onComplete() { 
         System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
        } 
       }); 
     // Waiting for the program to complete 
     System.out.println("[" + Thread.currentThread().getName() + "] Main"); 
     Thread.sleep(100000); 
    } 

} 

这段代码的输出如下:

[main] Subscription 
[main] Main 
[parallel-1] [3,1] 
[parallel-1] [1000,2] 
[parallel-1] [1001,3] 
[parallel-1] [1000,4] 
[parallel-1] [1000,5] 
[parallel-1] [1000,6] 
[parallel-1] [1001,7] 
[parallel-1] [1000,8] 
[parallel-1] [1000,9] 
[parallel-1] [1000,10] 
[parallel-1] [1000,11] 
[parallel-1] [1001,12] 
[parallel-1] [1000,13] 
[parallel-1] [1000,14] 
[parallel-1] [1000,15] 
[parallel-1] [1000,16] 
[parallel-1] [1001,17] 
[parallel-1] [1000,18] 
[parallel-1] [1000,19] 
[parallel-1] [1000,20] 
[parallel-1] Complete 

我的问题是,长时间操作总是执行上线程并行-1和每1秒。

我试着手动增加并行度或使用弹性调度程序,但结果是一样的。

我在想,publishOn方法用于该用途的情况下,被专门设计的。你能否告诉我我是否误解了某些东西?

回答

2

其实它按预期工作,你可以看到,凡在并行处理所有的值 - 经过时间几乎是相同的,但你总是在同一个线程内收到的元素,这样,每次等待1秒的时间。

我想,在简单Flux并行并不意味着更多的线程,这意味着在并行做的工作。如果你喜欢例如运行代码:

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
      .map(i -> { 
       System.out.println("map [" + Thread.currentThread().getName() + "] " + i); 
       return i; 
      }) 
      .elapsed() 
      .publishOn(Schedulers.single()) 
      .subscribeOn(Schedulers.single()) 
      .subscribe(t2 -> { 
       System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2); 
      }); 

你会看到的结果:

map [single-1] 0 
map [single-1] 1 
... 
subscribe [single-1] [4,0] 
subscribe [single-1] [0,1] 
... 

而且你可以看到,首先它做map所有元素,然后consume他们。但是,如果你改变例如.publishOn(Schedulers.parallel())您将看到:

map [single-1] 3 
subscribe [parallel-1] [5,0] 
map [single-1] 4 
subscribe [parallel-1] [0,1] 
map [single-1] 5 
... 

所以这一次在做并行线程两种操作。我不确定我是否正确理解一切。

有并行执行特定ParallelFlux。在下面的例子一切都将在不同的线程来完成

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
     .elapsed() 
     .parallel() 
     .runOn(Schedulers.parallel()) 
     .subscribe(t2 -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
      try { 
       Thread.sleep(1000); // long operation 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }, throwable -> { 
      System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
     },() -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
     }, subscription -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
      subscription.request(Long.MAX_VALUE); 
     }); 

结果是这样的:

[parallel-1] [8,0] 
[parallel-2] [0,1] 
[parallel-3] [0,2] 
[parallel-4] [0,3] 
[parallel-1] [0,4] 
... 

所以它使用几个线程来处理结果。从我的角度来看,它确实是平行的。

还要注意的是,如果你使用的方法.subscribe(Subscriber<? super T> s)所有结果将在连续的方式被消耗掉,并使用它们并行你应该使用:

public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> 
      onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe) 

或任何其他重载方法与Consumer<? super T> onNext,...参数

+0

@etiennepeiniau我猜在简单的Flux中,它不是“并行==更多的线程”。我将编辑我的答案并添加一个示例。 –