2017-06-16 68 views
0

我有一个长时间运行的任务(说一个Observable<Integer>),我想在应用程序中尽可能多次触发。我对处理以各种方式发送的事件的任务有多个“视图”。我的整个应用程序中只有一个subscribe如何控制RxJava 2中的订阅计数?

如何确保长时间运行的任务仅针对每个订阅触发一次,并且仅在订阅需要时触发?

为了让事情变得更具体,这里是一个单元测试:

@Test 
public void testSubscriptionCount() { 

    final Counter counter = new Counter(); 

    // Some long running tasks that should be triggered once per subscribe 
    final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5) 
     .doOnSubscribe(subscription -> { 
      counter.increment(); 
     }); 

    // Some "view" on the long running task 
    final Observable<Integer> b = a.filter(x -> x % 2 == 0); 

    // Another "view" on the long running task 
    final Observable<Integer> c = a.filter(x -> x % 2 == 1); 

    // A view on the views 
    final Observable<Integer> d = Observable.zip(b, c, (x, y) -> x + y); 

    d.toList().blockingGet(); 

    assertEquals(1, counter.count); // Fails, counter.count == 2 
} 

我想a当其意见(bcd)一个订阅才能触发,但每次订阅也只有一次。

在上面的代码中,订阅发生两次(我认为d触发bc,这两个都独立触发a)。


添加.share()没有解决不了的问题(虽然我认为这是沿着正确的路线):

// Some long running tasks that should be triggered once per subscribe 
    final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5) 
     .doOnSubscribe(subscription -> counter.increment()) 
     .share(); 

java.lang.AssertionError:

Expected :1

Actual :2

+1

我不清楚“只在需要时触发”这个短语的意思。你有单元测试吗? –

+0

我的意思是observable只能在有人直接订阅它或者从它构建的其他observable时才运行它的发射器代码。 – sdgfsdh

+0

你所描述的是观察者的工作方式。你有什么理由相信他们不这样工作? –

回答

0

如果你的目标是防止多次执行,当观察者订阅平行,.share()是你在找什么:

Observable<Integer> shared = source.share(); 

// In thread 1: 

shared.subscribe(...); 

// In thread 2: 

shared.subscribe(...); 

只要源观察值在第二次订阅发生时尚未完成,它将收到与第一次相同的结果,并且不会强制源观察值的另一次执行。

RxJava文档有更详细的解释,但它基本上是一个包装器,它有一些引用计数,只有在需要时才会订阅源可观察值以避免并发执行。

同时请记住,时间将扮演实际交付价值的重要部分。我不相信.share()会做任何特定的元素缓冲,所以如果元素在第二次订阅之前交付,第二次订阅不会获得这些元素。您必须使用.buffer()或其他一些方法来保留晚期订户的结果。

+0

我认为'.share()'是正确的想法,但它仍然给我两个订阅(见编辑)。 – sdgfsdh

+0

是的,它会给你两个订阅,因为你的操作不是长时间运行。如果在.doOnSubscribe之前添加一个.delay(1L,TimeUnit.SECONDS),它完全符合你的要求。共享旨在防止并发执行。在你的情况下,它会很快完成两次执行的共享。 – Matt

+0

我明白了。有没有像'share'这样的变体,但是在第一次触发后没有再次订阅? – sdgfsdh