2016-11-14 74 views
2

此代码RXJava份额()不Observable.create工作()

public class ConnectObs { 
public static void main(String[] args) { 

    Observable<Integer> intsObservable = Observable.just(1, 2); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("A " + s)); 
    intsObservable.subscribe(s->System.out.println("B " + s)); 

    intsObservable = Observable.create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
    }); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("C " + s)); 
    intsObservable.subscribe(s->System.out.println("D " + s)); 
    } 
} 

产生用于A,B和C的结果,但不是为d - 这是为什么?

结果如下:

A 1 
A 2 
B 1 
B 2 
C 1 
C 2 

回答

2

Observable.just和您的自定义可观测(其不被方式安全标准建造)之间的重要区别是,你这样C认购仍处于活动状态时,没有完成的流D订阅发生,因此D只是等待更多的未来排放量。

你的创作应该是这样的:

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
     s.onCompleted(); 
}) 
//prevent MissingBackpressureException 
.onBackpressureBuffer(); 

有那么一点友好的用户您可以添加unsubscribe检查过:

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     if (!s.isUnsubscribed()) 
      s.onNext(2); 
     if (!s.isUnsubscribed()) 
      s.onCompleted(); 
}).onBackpressureBuffer();