2017-02-04 101 views
1

我有一个Observable,我正在使用它将一个承诺转换为订阅。这导致了我需要迭代的集合来调用每个元素上的HTTP服务。我使用forkJoin等待所有这些调用完成,以便我可以做其他事情,但不幸的是,我的订阅没有被调用。你看到我在这里失踪了吗?可观察订阅没有被调用

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser).subscribe(
     results => { 
      this.progress += 1; 
     }, 
     err => { 
      this.progress += 1; 
      this.handleError(<any>err); 
     }) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

回答

2

有一件事是你不要订阅每个Observable传递给forkJoin()。运营商必须自己做。

如果您想在每个Observable完成时收到通知,您可以使用.do(undefined, undefined,() => {...})

let observables = [ 
    Observable.of(42).do(undefined, undefined,() => console.log('done')), 
    Observable.of('a').delay(100).do(undefined, undefined,() => console.log('done')), 
    Observable.of(true).do(undefined, undefined,() => console.log('done')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

这将打印到控制台:

done 
done 
done 
[ 42, 'a', true ] 

最后还有就是.finally()运营商。但是,这与使用.do()不一样。

编辑:

当任何源可观察量失败forkJoin()操作者重新发射的误差(这意味着它也失败)。
这意味着您需要分别在每个源Observable中捕获错误(例如,使用catch()运算符)。

let observables = [ 
    Observable.throw(new Error()) 
    .catch(() => Observable.of('caught error 1')) 
    .do(undefined, undefined,() => console.log('done 1')), 

    Observable.of('a') 
    .delay(100).catch(() => Observable.of('caught error 2')) 
    .do(undefined, undefined,() => console.log('done 2')), 

    Observable.of(true) 
    .catch(() => Observable.of('caught error 3')) 
    .do(undefined, undefined,() => console.log('done 3')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

它打印:

done 1 
done 3 
done 2 
[ 'caught error 1', 'a', true ] 
+0

好的,我明白你的意思了,但如果其中一个观察对象有例外,那么forkJoin会在其他人完成之前马上拿起它。我认为forkJoin应该等待所有人完成,但是从源头上看,它会标记为“完成”而不是下一个。在我的例子中,我希望每个observable能够优雅地处理错误并继续。有没有办法做到这一点? – occasl

+0

@occasl查看我的更新。 – martin

0

我不认为你需要在地图上订阅。

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser)) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

注意的是,在rxjs例子在这里:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

他们不同意个别观测 - ForkJoin让他们都去,然后等待他们全部返回(在你的订阅)

编辑:

的forkjoin源是在这里:

https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/forkjoin.js

并且看起来好像没有挂钩来确定每次完成的时间。我认为最好的方法来处理一个用户界面栏会让每个映射的observables单独订阅,所有这些都调用一个函数来增加UI计数栏变量,并对“完整性”进行一些测试,以允许您使用这些数据。

+0

呀,后来我将如何增加我(对UI的进度条)为每一个完成柜台?基本上我想在每个人完成时做一些事情,然后在完成时做一些事情。 – occasl

+0

我刚刚查看了forkjoin源代码,我不认为它会公开任何允许您插入并查看每个代码何时进入的内容。我认为您必须单独订阅每个映射并调用某种为您的UI进度条计数功能。 – chrispy