2017-03-09 100 views
1

我试图通过observable对一组内容进行流式处理,并在第一个错误后停止。将它看作一系列项目是简单的,因为它的行为方式相同。rxjs在错误后立即完成了observable而不是继续

  1. 我创建可观察到的从项
  2. 的阵列的每个项目映射到URL
  3. 调用URL作为请求承诺
  4. 执行一个catch()返回一个observable.empty ()中的错误

使用RxJS 5的事件:

rx.Observable.from(array) 
    .map(self.createUrl) 
    .flatMap(x => { 
      var options = { 
      uri: url, 
      headers: { 
       "Content-Type": "application/json" 
      }; 
      return rx.Observable.fromPromise(request-promise(options)); 
     }) 
    .catch(() => { 
    return rx.Observable.empty();}) 
    .subscribe(x => console.log('success:', x), 
      e => console.log('error'), 
      () => console.log('complete')); 

执行此序列时,代码在遇到第一个错误后停止。我怀疑#4中的空观察者正在终止观测值,但我不知道为什么。

我期望的过程是无论错误如何处理数组中的所有项目 - 最终处理所有成功项目并在每个错误后恢复。

+2

这种行为是有道理的。如果发生错误,原始观测值将完成并被替换为空值。我认为你正在寻找['onErrorResumeNext()'](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/onerrorresumenext.md)。使用它,例如,结合'flatMap' – slezica

+0

我试图通过包装.from(数组)行: rx.Observable.onErrorResumeNext(rx.Observable.from(array)), 这样做什么都没有。 我不知道如何将它从flatMap包装到请求?或者我错过了什么? –

+0

对不起 - pebkac。将可观察*包裹在里面*平面地图就是你说的。谢谢。它看起来像我还没有解决答复。 –

回答

3

你只需把catch()flatMap内:

rx.Observable.from(array) 
    .map(self.createUrl) 
    .flatMap(x => { 
    var options = { 
     uri: url, 
     headers: { 
     "Content-Type": "application/json" 
     } 
    }; 
    return rx.Observable 
     .fromPromise(request-promise(options)) 
     .catch(() => rx.Observable.empty()); 
    }) 
    .subscribe(x => console.log('success:', x), 
      e => console.log('error'), 
      () => console.log('complete')); 

现在,当再内可观测发出错误,它会立即捕获并通过.flatMap()不会传播到主流。