2017-06-21 68 views
2

比方说,我有一个sequence这样的:RxJS - 重试或重置

Rx.Observable 
.interval(1000) 
.subscribe(data => {console.log(data)}) 

随着operators,我怎么能 'restart' 的顺序,意味着unsubscriberesubscribe

真正的情况是,sequence是一个套接字流,当我们需要unsubscriberesubscribe,有点像retryWhen(errors)作品,但不能有错误一定的条件......在理想情况下是这样的... retryWhen(bool:Subject)

回答

2

我会这样做使用switchMap(),因为它会自动退订旧的Observable并订阅新的。在这种情况下我们只用.switchMap(() => source)

const subject = new Subject(); 

const source = Observable.create(obs => { 
    console.log('Observable.create'); 
    obs.next(42); 
}); 

subject.switchMap(() => source) 
    .subscribe(v => console.log('next:', v)); 


setTimeout(() => subject.next(), 1000); 
setTimeout(() => subject.next(), 5000); 

此打印如下:

Observable.create 
next: 42 
Observable.create 
next: 42 

只是代替source你有你的WebSocket源(或任何你有)。

+0

感谢这个解决方案,但是如何在没有主题的情况下做到这一点?那可能吗? – Thibs