2017-08-15 69 views
0

我知道这是因为这里desribedrxJs主题,在订阅错误杀死整个流

基本问题在上面的链接描述,但这里是有关一个已知的行为代码(取自链接)

// This is going to behave strangely 
const source$ = Observable.interval(1000).share(); 
const mapped$ = source$.map(x => { 
    if (x === 1) { 
    throw new Error('oops'); 
    } 
    return x; 
}); 
source$.subscribe(x => console.log('A', x)); 
mapped$.subscribe(x => console.log('B', x)); 
source$.subscribe(x => console.log('C', x)); 
// "A" 0 
// "B" 0 
// "C" 0 
// "A" 1 
// Uncaught Error: "oops" 

订阅中的错误将终止整个源码流。

可观察的解决方案是使用.observeOn(Rx.Scheduler.asap);

我是相当新的整体反应式编程,我挣扎着该解决方案适用于我的Subject因为主体不支持observeOn

但我需要一个Subject因为我需要推动新的价值蒸汽。

如何解决此问题或使用observeOnSubject

observeOn返回Observable。但我努力如何将observeOn与我的Subject相结合。 如何使用observeOn并仍然能够将值推送到我的主题?

这里是当前代码(简体)

export class MyClass{ 

    private messages: Subject<Message> = new Subject<Message>(); 

    dispatchMessage(message: Message) { 

    this.messages.next(message); 
} 

想法?

P.S.

对于任何人使用角(像我),observeOn可能会有一些不希望的副作用。 https://github.com/angular/angular/issues/14316

就像任何人提到这个问题的附加信息一样。

+0

解决方法什么问题? 'observeOn'只是一个存在于'Subject'类的运算符。 – martin

+0

对不起,我以某种方式混淆了它。我苦于'observeOn'返回一个'Observable'而不是'Subject'。 我编辑的问题 – Arikael

回答

0

在你只需要一个单独的参考Subject以及对你追加observeOn操作后,链的引用情况:

const subject$ = new Subject(); 
const obs$ = subject$.observeOn(...); 

obs$.subscribe(...); 
subject$.next(...); 
+0

这么简单,我觉得有点愚蠢...... – Arikael