2017-04-25 61 views
2

这个可观察的轮询每秒钟调用getPromise()函数。在getPromise()函数返回3个promise后,它停止解析它们。如何检测getPromise()函数没有解决/拒绝过去的任何承诺,比如说2秒,然后调用onError处理函数。我试图让它与timeout运营商合作无济于事。有任何想法吗?如何让可观察的轮询检测到未完成的承诺

Rx.Observable.interval(1000) 
 
    .switchMap(() => Rx.Observable.fromPromise(getPromise())) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

回答

2

可以使用race操作,订阅只发出第一观察到。

你说你想在2个不活动状态之后调用onError处理程序。这与使用switchMap相矛盾,当从回调中返回新的Observable时,它自动取消订阅。所以你可能想用exhaustMap代替。此外,当您发出错误通知时,该链会取消订阅,您将永远不会收到任何其他值。这意味着您不应该将超时设置为error或使用retry运算符自动重新订阅(但这实际上取决于您要实现的内容)。

这是您更新的示例,该示例仅使用race()运算符。

Rx.Observable.interval(1000) 
 
    .switchMap(() => 
 
    Rx.Observable.race(
 
     Rx.Observable.fromPromise(getPromise()), 
 
     Rx.Observable.timer(0, 1000).mapTo(42) 
 
    ) 
 
) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.3.0/Rx.js"></script>

编辑:2秒不活动之后发送单个错误通知。

Rx.Observable.interval(1000) 
 
    .switchMap(() => Rx.Observable.fromPromise(getPromise())) 
 
    .timeout(2000) 
 
    .subscribe(onValue, onError); 
 

 
function onValue(value){ 
 
    console.log('value: ', value); 
 
} 
 
function onError(error){ 
 
    console.log('error: ', error); 
 
} 
 
var getPromise = (function(){ 
 
    var counter = 3; 
 
    return function(){ 
 
    return new Promise(function(resolve, reject){ 
 
     if(counter > 0) resolve(1); 
 
     counter--; 
 
    }) 
 
    } 
 
})();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.2.0/Rx.js"></script>

有真的在5.3.0中的错误不能直接在timeout()运营商,但在调度异步操作。 https://github.com/ReactiveX/rxjs/pull/2580

没有timeout()操作:

Rx.Observable.interval(1000) 
    .switchMap(() => 
    Rx.Observable.race(
     Rx.Observable.fromPromise(getPromise()), 
     Rx.Observable.timer(0, 2000).map(function(_) { 
     throw new Error('timeout'); 
     }) 
    ) 
) 
    .subscribe(onValue, onError); 
+0

嘿,这是一个很好的解决方案!在获得第42名后,你会如何取消订阅?我不需要其余的人。 –

+0

这个可观察到的应该在2s过去之后取消写入,并且没有发射任何值。 –

+0

@EugeneEpifanov查看我使用'timeout()'引发错误的更新。它实际上看起来像在'timeout()'运算符中的RxJS 5.3.0中存在一个错误。使用RxJS 5.2.0它的工作原理应该是我想的。最近对'timeout()'运算符进行了更改,但是https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#530-2017-04-03 – martin