可以使用race
操作,订阅只发出第一观察到。
你说你想在2个不活动状态之后调用onError
处理程序。这与使用switchMap
相矛盾,当从回调中返回新的Observable时,它自动取消订阅。所以你可能想用exhaustMap
代替。此外,当您发出错误通知时,该链会取消订阅,您将永远不会收到任何其他值。这意味着您不应该将超时设置为error
或使用retry
运算符自动重新订阅(但这实际上取决于您要实现的内容)。
这是您更新的示例,该示例仅使用race()
运算符。
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 1000).mapTo(42)
)
)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>
编辑:2秒不活动之后发送单个错误通知。
Rx.Observable.interval(1000)
.switchMap(() => Rx.Observable.fromPromise(getPromise()))
.timeout(2000)
.subscribe(onValue, onError);
function onValue(value){
console.log('value: ', value);
}
function onError(error){
console.log('error: ', error);
}
var getPromise = (function(){
var counter = 3;
return function(){
return new Promise(function(resolve, reject){
if(counter > 0) resolve(1);
counter--;
})
}
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>
有真的在5.3.0中的错误不能直接在timeout()
运营商,但在调度异步操作。 https://github.com/ReactiveX/rxjs/pull/2580
没有timeout()
操作:
Rx.Observable.interval(1000)
.switchMap(() =>
Rx.Observable.race(
Rx.Observable.fromPromise(getPromise()),
Rx.Observable.timer(0, 2000).map(function(_) {
throw new Error('timeout');
})
)
)
.subscribe(onValue, onError);
嘿,这是一个很好的解决方案!在获得第42名后,你会如何取消订阅?我不需要其余的人。 –
这个可观察到的应该在2s过去之后取消写入,并且没有发射任何值。 –
@EugeneEpifanov查看我使用'timeout()'引发错误的更新。它实际上看起来像在'timeout()'运算符中的RxJS 5.3.0中存在一个错误。使用RxJS 5.2.0它的工作原理应该是我想的。最近对'timeout()'运算符进行了更改,但是https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#530-2017-04-03 – martin