我有一个Observable
,我想要冷,也就是说,它应该只在第一个观察者订阅它时才开始发射物品。所有观察员都退订Observable是否有事件?
然后,我想确保在所有观察者都取消订阅同一个observable时从源中释放所有资源。那可能吗?
我有一个Observable
,我想要冷,也就是说,它应该只在第一个观察者订阅它时才开始发射物品。所有观察员都退订Observable是否有事件?
然后,我想确保在所有观察者都取消订阅同一个observable时从源中释放所有资源。那可能吗?
您可以使用ConnectedObservable的功能来处理这个要求:
//Replace Observable.range(1,1000) with your Observable implementation
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share();
share
方法调用方法publish
和refCount
。
publish
将您的“普通”Observable转换为ConnectedObservable,当您拨打connect
时,它将开始发射物品。因此,您可以在技术上订阅尽可能多的观察者,然后致电connect
同时开始为他们全部发布物品。
refCount
将您的ConnectedObservable再次转换回传统的,但具有新的特点!附加的好处是:这个可观察的现在是冷的(只有当用户订阅时才开始发射,在内部它调用publish
创建的原始ConnectedObservable的connect
方法),并跟踪有多少用户连接到原始ConnectedObservable。一旦所有订户都取消订阅,它将从源代码ConnectedObservable中取消绑定,因此逻辑变得更简单,因为您只需处理一个订阅。
没有为共享操作这里的好图:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()
或者,如果这是不够灵活,我想你应该能够实现这种行为很容易为了使用defer
创建冷观察到,以及doOnSubscribe
和doOnUnsubscribe
方法。
例子:
Observable.defer(() -> {
final AtomicInteger counter = new AtomicInteger();
return Observable.range(1, 1000)
.doOnSubscribe(() -> counter.incrementAndGet())
.doOnUnsubscribe(() -> {
if (counter.decrementAndGet() == 0) {
freeResources();
}
});
});
此观察到将开始发射数字序列(与你观察的实现来替代这一点)只要第一用户订阅,它会增加与每个订阅一个计数器,并释放一旦所有订阅者都取消订阅,就会使用资源(取代freeResources以满足您的任何需求)。