2017-09-03 265 views
2

我用下面的任务试验,以让我的头周围RxJava的简单的例子:rxjava2 - 在一个线程池执行任务,订阅在单个线程

  • 给出的URL列表
  • 待办事项用于在线程池
  • 对于每个结果每个URL一个HTTP请求中插入一些数据到SQLite数据库(这里没有多线程)
  • 块中的方法,直到它完成

所以,我想它在科特林:

val ex = Executors.newFixedThreadPool(10) 
Observable.fromIterable((1..100).toList()) 
    .observeOn(Schedulers.from(ex)) 
    .map { Thread.currentThread().name } 
    .subscribe { println(it + " " + Thread.currentThread().name } 

我希望它打印

pool-1-thread-1 main 
pool-1-thread-2 main 
pool-1-thread-3 main 
pool-1-thread-4 main 
.... 

但是它打印:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 
pool-1-thread-1 pool-1-thread-1 

任何人都可以纠正我关于如何工作的误解?为什么它不使用线程池的所有线程?我如何让我的订阅者在主线程上运行或阻塞直到完成?

回答

3

Rx并不是指平行执行服务,因此使用Java的流API。 Rx事件是同步的,随后将流过流。 observeOn在构建流时会请求线程一次,并在该线程上逐个处理排放。

您还希望subscribe在主线程上执行。 observeOn切换线程,并在该线程上发生所有下游事件。如果您想切换到主线程,则必须在subscribe之前插入另一个observeOn

1

使代码并行的map块里面工作,你应该把它换到可观察到的与自己的调度:

val ex = Executors.newFixedThreadPool(10) 
    val scheduler = Schedulers.from(ex) 
    Observable.fromIterable((1..100).toList()) 
      .flatMap { 
       Observable 
         .fromCallable { Thread.currentThread().name } 
         .subscribeOn(scheduler) 
      } 
      .subscribe { println(it + " " + Thread.currentThread().name) } 

在这种情况下,你会看到的结果是:

pool-1-thread-1 pool-1-thread-1 
pool-1-thread-2 pool-1-thread-1 
pool-1-thread-3 pool-1-thread-1 
pool-1-thread-4 pool-1-thread-1 
... 

你可以检查文章RxJava - Achieving Parallelization,给出这种行为的解释。

另外,RxJava 2.0.5引入ParallelFlowable API

相关问题