2017-07-31 78 views
2

如何在代码中稍后创建observable并生成下一个值?我希望能够通过其他异步事件从代码的不同部分调用onNext。将观察者存储在全局变量中并稍后使用

这是我已经尽力了,这是不行的:

var Rx = require('rx'); 

var GlobalObserver; 

var source = Rx.Observable.create(observer => { 
    GlobalObserver = observer; 
}); 

var subscription = source.subscribe(
    x => console.log('onNext: %s', x), 
    e => console.log('onError: %s', e), 
    () => console.log('onCompleted') 
); 

//...later in the code, as a result of another async event: 


    GlobalObserver.onNext(someData); 
    ... 
    GlobalObserver.onNext(someOtherData); 

回答

4

你需要的是某种形式的主题。 ReplaySubjectBehaviorSubjectSubject

创建一个主题,那么你可以做subject.subscribe(...)订阅它。您也可以通过subject.onNext(...)添加到流中。

例如:

var subject = new Rx.Subject(); 

var subscription = subject.subscribe(
    function (x) { console.log('onNext: ' + x); }, 
    function (e) { console.log('onError: ' + e.message); }, 
    function() { console.log('onCompleted'); } 
); 

subject.onNext(1); 
// => onNext: 1 

subject.onNext(2); 
// => onNext: 2 

subject.onCompleted(); 
// => onCompleted 

subscription.dispose(); 

更具体的使用情况下(将每一个成功的HTTP响应返回的时间增加了观察到的流):

var httpResponseStream = new Rx.Subject(); 

var subscription = httpResponseStream.subscribe(function (response) { 
    console.log('HTTP response success: ', response); 
}); 

makeAJAXCall().then(function (response) { 
    httpResponseStream.onNext(response); 
}); 

至于另一位用户声称,要确保如果您使用的是V5,则将所有onNext的更改为next。如果您使用V4,请坚持onNext

1

我假设这是因为您正在使用rxjs版本^ 5.0.0并阅读版本^ 4.0.0的文档。

对于RxJs版本^5.0.0它是前人的精力,而不是observer.next(value)observer.onNext(value)

这里您可以找到docs为RxJs版本^5.0.0

+0

这绝对应该是一个评论,不是答案。 – DavidDomain