2017-07-04 51 views
0

我想要达到这是(与角2 /打字稿):RxJS ZIP不工作时forkJoin做

  • 而观察到的生产活动流。

  • 对于Observable A的每个事件,进行8个不同的http调用。 (8个switchmaps)

  • 在所有8个请求返回后,做一些事情(订阅8个switchmaps的zip)。

  • 重复8名而观察到的(由switchmap和zip照顾)的每个事件的请求

代码:(在https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1完整的代码)

let source = Observable 
.interval(5000) 
.take(100); 

let requests = []; 

for(let i=0; i<8;i++) { 
    let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish(); 
    request.subscribe(res => console.log(res.json())); 
    requests.push(request); 
} 

Observable.zip(requests) 
.subscribe(console.log("All requests completed")); 

requests.forEach(r => r.connect()); 

问题是我的拉链永远不会被召唤。我console.log'订阅了每个8个switchmaps,并且我收到日志,显示每次有Observable/stream A中的事件时,都会成功返回8个http调用。(也可以看到在网络选项卡中返回的8个调用的调试工具)

但是zip从不发射任何东西。


如果我尝试不同的(不太理想)的方法:

  • 订阅而观察到的一次(不switchmap)
  • 在订阅创建8个可观测量每个HTTP调用,并订阅ForkJoin的8个观测量

代码:(在https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj全码)

let source = Observable 
.interval(5000) 
.take(100); 

source.subscribe(x=> { 
    console.log(x); 
    let requests = []; 

    for(let i=0; i<8;i++) { 
    let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish(); 
    request.subscribe(res => console.log(res.json())); 
    requests.push(request); 
    } 

    Observable.forkJoin(requests) 
    .subscribe(console.log("All requests completed")); 

    requests.forEach(r => r.connect()); 

}); 

This works。但是每当Observable A发出时,我都会创建8 + 1个嵌套的observables /订阅。

(在我使用发布/连接到共享/重用订阅,但即使没有它的问题存在两种情况)

+1

显示一些代码。 –

+0

@RobinDijkhof新增了它。花了一些时间来从我的应用程序中提取主要逻辑 – flak37

回答

1

你,如果你正确地调用zip有多个参数,并通过一个函数第一个例子会工作订阅(而不是未定义的console.log的结果)。 Demo

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests completed")); // <-- pass a function 

requests.forEach(r => r.connect()); 
+0

This Works!谢谢。但我很好奇为什么forkJoin,combineLatest等工作没有散布论证。 (以及为什么订阅直接与他们console.log(),而不通过一个lambda /箭头函数)? – flak37

+1

@ flak37“为什么forkJoin,combineLatest等工作没有传播”ad-hoc多态性。当第一个参数是数组时,它们有特殊的处理。 [code](https://unpkg.com/[email protected]/observable/ForkJoinObservable.js) –

+1

@ flak37“还有为什么订阅直接与console.log()一起工作”它实际上并不工作。简单的证明:https://plnkr.co/edit/k0XW99xu8WLdMY3ecOl9?p=preview –