2016-08-12 54 views
2

我一直读了几文档的背压RxJava,但我不能找到喜欢它是如何在库内部发生的详细解释,每个人都只是概括它像“生产者”是太快, “消费者”太慢了。如何背压RxJava内部发生

例如像下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS) 
    .observeOn(Schedulers.newThread()) 
    .subscribe(
     i -> { 
      System.out.println(i); 
      try { 
       Thread.sleep(100); 
      } catch (Exception e) { } 
     }, 
     System.out::println); 

我一直经历的RxJava源代码,所以我的理解是,在主线程中,我们要发出的每毫秒的事件,一旦我们发出它,我们通过将值的System.out.println(我)方法,并把它扔进了newThead调度的线程池,运行里面一个可运行的方法。

所以我的问题是,如何将内部例外发生?原因当我们调用Thread.sleep()方法,我们只是在睡觉,处理方法调用线程 - >的System.out.println(),而不在线程池中影响其他线程,为什么会导致异常。是否因为线程池不再有足够的可用线程?

感谢

回答

3

你能想到的背压作为许可证制度一个操作员的手了其上游源:你可以给我128元。稍后,该运营商可能会说“好吧,再给我96个”,因此总共可以有224个未完成的许可证。有些来源,如interval不关心许可证,只是定期递交价值观。由于许可证的数量通常是强烈依赖于可用容量在队列或缓存器,除了这些存储器端出更可容纳产量MissingBackpressureException

检测到背压违规主要发生在有限队列的offer返回false时,例如observeOn中的指示队列已满。

检测违反是由操作者跟踪的突出允许计数的第二种方式,例如onBackpressureDrop并且每当向上游发送比这更多的,操作者只需将不转发它:

// in onBackpressureDrop 
public void onNext(T value) { 
    if (emitted != availablePermits) { 
     emitted++; 
     child.onNext(value); 
    } else { 
     // ignoring this value 
    } 
} 

的儿童用户信号经由请求()其允许通常导致像这样在onBackpressureDrop

public void childRequested(long n) { 
    availablePermits += n; 
} 

在实践中,由于可能异步执行,availablePermits是一个ÑAtomicLong(并且被称为requested)。

+0

您是说RxJava中的一些运算符会将onNext()事件放入队列或缓冲区数据结构中吗?所以这个异常不是由于线程池所致;)? – Qing