我一直读了几文档的背压RxJava,但我不能找到喜欢它是如何在库内部发生的详细解释,每个人都只是概括它像“生产者”是太快, “消费者”太慢了。如何背压RxJava内部发生
例如像下面的代码:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
我一直经历的RxJava源代码,所以我的理解是,在主线程中,我们要发出的每毫秒的事件,一旦我们发出它,我们通过将值的System.out.println(我)方法,并把它扔进了newThead调度的线程池,运行里面一个可运行的方法。
所以我的问题是,如何将内部例外发生?原因当我们调用Thread.sleep()方法,我们只是在睡觉,处理方法调用线程 - >的System.out.println(),而不在线程池中影响其他线程,为什么会导致异常。是否因为线程池不再有足够的可用线程?
感谢
您是说RxJava中的一些运算符会将onNext()事件放入队列或缓冲区数据结构中吗?所以这个异常不是由于线程池所致;)? – Qing