1
我试图通过observable对一组内容进行流式处理,并在第一个错误后停止。将它看作一系列项目是简单的,因为它的行为方式相同。rxjs在错误后立即完成了observable而不是继续
- 我创建可观察到的从项
- 的阵列的每个项目映射到URL
- 调用URL作为请求承诺
- 执行一个catch()返回一个observable.empty ()中的错误
使用RxJS 5的事件:
rx.Observable.from(array)
.map(self.createUrl)
.flatMap(x => {
var options = {
uri: url,
headers: {
"Content-Type": "application/json"
};
return rx.Observable.fromPromise(request-promise(options));
})
.catch(() => {
return rx.Observable.empty();})
.subscribe(x => console.log('success:', x),
e => console.log('error'),
() => console.log('complete'));
执行此序列时,代码在遇到第一个错误后停止。我怀疑#4中的空观察者正在终止观测值,但我不知道为什么。
我期望的过程是无论错误如何处理数组中的所有项目 - 最终处理所有成功项目并在每个错误后恢复。
这种行为是有道理的。如果发生错误,原始观测值将完成并被替换为空值。我认为你正在寻找['onErrorResumeNext()'](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/onerrorresumenext.md)。使用它,例如,结合'flatMap' – slezica
我试图通过包装.from(数组)行: rx.Observable.onErrorResumeNext(rx.Observable.from(array)), 这样做什么都没有。 我不知道如何将它从flatMap包装到请求?或者我错过了什么? –
对不起 - pebkac。将可观察*包裹在里面*平面地图就是你说的。谢谢。它看起来像我还没有解决答复。 –