我有一个长时间运行的任务(说一个Observable<Integer>
),我想在应用程序中尽可能多次触发。我对处理以各种方式发送的事件的任务有多个“视图”。我的整个应用程序中只有一个subscribe
。如何控制RxJava 2中的订阅计数?
如何确保长时间运行的任务仅针对每个订阅触发一次,并且仅在订阅需要时触发?
为了让事情变得更具体,这里是一个单元测试:
@Test
public void testSubscriptionCount() {
final Counter counter = new Counter();
// Some long running tasks that should be triggered once per subscribe
final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5)
.doOnSubscribe(subscription -> {
counter.increment();
});
// Some "view" on the long running task
final Observable<Integer> b = a.filter(x -> x % 2 == 0);
// Another "view" on the long running task
final Observable<Integer> c = a.filter(x -> x % 2 == 1);
// A view on the views
final Observable<Integer> d = Observable.zip(b, c, (x, y) -> x + y);
d.toList().blockingGet();
assertEquals(1, counter.count); // Fails, counter.count == 2
}
我想a
当其意见(b
,c
或d
)一个订阅才能触发,但每次订阅也只有一次。
在上面的代码中,订阅发生两次(我认为d
触发b
和c
,这两个都独立触发a
)。
添加.share()
没有解决不了的问题(虽然我认为这是沿着正确的路线):
// Some long running tasks that should be triggered once per subscribe
final Observable<Integer> a = Observable.just(1, 2, 3, 4, 5)
.doOnSubscribe(subscription -> counter.increment())
.share();
java.lang.AssertionError:
Expected :1
Actual :2
我不清楚“只在需要时触发”这个短语的意思。你有单元测试吗? –
我的意思是observable只能在有人直接订阅它或者从它构建的其他observable时才运行它的发射器代码。 – sdgfsdh
你所描述的是观察者的工作方式。你有什么理由相信他们不这样工作? –