2017-03-16 103 views
1

我期待合并流和刷新所有用户。RxJs合并流

对于函数式编程而言,concat不会更新当前的observable,我不明白正确的方法。

这我怎么想的事情发生:

var observable = Rx.Observable.from(['stream1']); 
var subscription = observable.subscribe(
    function(x) { 
     console.log('Next: ' + x); 
    }, 
    function(err) { 
     console.log('Error: ' + err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 
observable.concat(Rx.Observable.from(['stream2'])); 

此代码仅读取第一个数据流,然后去完成。 当concat它创建一个新的Observable,我真的不想要,因为我已经订阅了第一个。

什么是正确的做法,因为我甚至无法推入第一个可观察的?

谢谢!

回答

2

你所要做的是不完全可能这样,但你基本上有两个选择去,既涉及Subject的用法:

1)数据的人工发射

const obs$ = Rx.Observable.of("stream1"); 
const subj$ = new Rx.Subject(); 

Rx.Observable.merge(obj$, subj$) 
    .subscribe(
     x => console.log('Next: ' + x), 
     x => console.log('Error: ' + x), 
     () => console.log('Complete') 
    ); 

subj$.next("stream2"); 
subj$.next("stream3"); 

但是:在这种情况下,完整将永远不会被调用,因为Subject永远不会自行完成 - 所以如果您需要触发您的complete处理程序,则必须添加手册subj$.complete();到最后。


2)多播通过主题

const obs$ = Rx.Observable.of("stream1"); 
const subj$ = new Rx.Subject(); 

subj$.subscribe(
    x => console.log('Next: ' + x), 
    x => console.log('Error: ' + x), 
    () => console.log('Complete') 
); 

obs$.subscribe(x => subj$.next(x)); 
const obs2$ = Rx.Observable.of("stream2"); 
obs2$.subscribe(x => subj$.next(x)); 

在这种情况下的Subject将基本上充当“代理”只会传播的数据,但没有无差错或完全的触发器。

这两种解决方案都不是很“好” - 但也许你可以更好地概述你的用例,我相信有一个适当的解决方案,它不涉及任何复杂的解决方法。


如果你只是想有一种方式来继续提供数据形成永久观察的,你应该使用BehaviorSubject - 它工作的方式,你可以在它发出的数据并订阅它在同时:

class Service { 
    public data$ = new BehaviorSubject(someInitialDataOrNull); 

    public getData() { 
     makeSomeHttpCall() 
      .subscribe(data => data$.next(data)); 
    } 
} 

class Component { 
    constructor() { 
     theService.data$.subscribe(data => console.log(data)); 
    } 
} 

这里是对BehaviorSubject的旧文档的链接(它基本上还是以同样的方式期待onNextnext现在,等...)

+0

非常感谢您的回答。我的想法是,服务将提供来自api或其他来源的数据的可观察性。我的其他组件正在订阅它,因此它需要从一开始就可以观察到,并且数据可用时会馈送其他组件。这个目的的正确模式是什么? – LeniM

+0

我已经更新了答案 – olsn

+0

非常感谢。你钉了它! – LeniM