2017-06-21 94 views
4

最近我意识到我不明白RxJava2背压是如何工作的。RxJava2可观测背压

我做了小测试,我希望它应该会失败,MissingBackpressureException例外:

@Test 
public void testBackpressureWillFail() { 
    Observable.<Integer>create(e -> { 
     for (int i = 0; i < 10000; i++) { 
      System.out.println("Emit: " + i); 
      e.onNext(i); 
     } 
     e.onComplete(); 
    }) 
    .subscribeOn(Schedulers.newThread()) 
    .observeOn(Schedulers.computation()) 
    .doOnNext(i -> { 
     Thread.sleep(100); 
     System.out.println("Processed:" + i); 
    }) 
    .blockingSubscribe(); 
} 

系统显示出未来:

Emit: 0 
Emit: 1 
Emit: 2 
... 
Emit: 10000 

Processed:0 
Processed:1 
Processed:2 
... 
Processed:10000 

为什么它不产生MissingBackpressureException

我希望e.onNext(i);将把项目进入缓冲区的ObservableObserveOn和之后的尺寸大于static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

应该抛出MissingBackpressureException这不会发生。缓冲区是否自动增长?如果没有物品存储在哪里?

+1

'Observable's在RxJava2不支持背压,只有'Flowable's做 –

+1

我知道,他们不支持backpreassure,但我认为不支持意味着MissingBackpressureException会抛出,不是aut o增长的缓冲区。 –

回答

3

这是因为背压仅在RxJava2中移出Flowable,请参阅here
如果您将切换到FlowableBackpressureStrategy.MISSING你会得到例外。
这也意味着,在你的情况,你确实有缓冲可以自动增长, 从observerOn文档:

修改的ObservableSource用于在指定调度执行其排放量和通知,异步与无限的缓冲。 ..

+0

谢谢,请你解释一下,如果缓冲区是无界的,Observable中的Observable observeOn(Scheduler scheduler,boolean delayError,int bufferSize)是什么原因?我确定缓冲区的大小会增长吗? –

+0

但我认为,根据文档,这是缓冲区增量步骤的“岛”的可配置大小。 – yosriz