2016-09-06 64 views

回答

4

您可以使用ConnectedObservable的功能来处理这个要求:

//Replace Observable.range(1,1000) with your Observable implementation 
Observable.range(1, 1000).doOnUnsubscribe(() -> freeResources()).share(); 

share方法调用方法publishrefCount

publish将您的“普通”Observable转换为ConnectedObservable,当您拨打connect时,它将开始发射物品。因此,您可以在技术上订阅尽可能多的观察者,然后致电connect同时开始为他们全部发布物品。

refCount将您的ConnectedObservable再次转换回传统的,但具有新的特点!附加的好处是:这个可观察的现在是冷的(只有当用户订阅时才开始发射,在内部它调用publish创建的原始ConnectedObservable的connect方法),并跟踪有多少用户连接到原始ConnectedObservable。一旦所有订户都取消订阅,它将从源代码ConnectedObservable中取消绑定,因此逻辑变得更简单,因为您只需处理一个订阅。

没有为共享操作这里的好图:http://reactivex.io/RxJava/javadoc/rx/Observable.html#share()

或者,如果这是不够灵活,我想你应该能够实现这种行为很容易为了使用defer创建冷观察到,以及doOnSubscribedoOnUnsubscribe方法。

例子:

Observable.defer(() -> { 
     final AtomicInteger counter = new AtomicInteger(); 
     return Observable.range(1, 1000) 
       .doOnSubscribe(() -> counter.incrementAndGet()) 
       .doOnUnsubscribe(() -> { 
        if (counter.decrementAndGet() == 0) { 
         freeResources(); 
        } 
       }); 
    }); 

此观察到将开始发射数字序列(与你观察的实现来替代这一点)只要第一用户订阅,它会增加与每个订阅一个计数器,并释放一旦所有订阅者都取消订阅,就会使用资源(取代freeResources以满足您的任何需求)。