2
我有一个事件流,我想调用一个函数来返回每个事件的承诺,问题是这个函数非常昂贵,所以我想一次最多处理n个事件。如何一次处理RxJS流n项目,一旦项目完成后,又自动填充回n?
这卵石图可能是错的,但是这是我想什么:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
这是代码,我到目前为止有:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
有此代码的两个问题:
- 它使用了
inProgressCount
变量,它是用边 更新效果函数秒。 完成$订阅仅在请求超过1个受控流的项目时调用一次。这使得inProgressCount
var更新不正确,这最终将队列限制为一次一个。
你可以看到它在这里工作: http://jsbin.com/wivehonifi/1/edit?js,console,output
问题:
- 有没有更好的方法吗?
- 我该如何摆脱
inProgressCount
变量? 为什么完成$订阅只需要在请求多个项目时调用一次?
更新:
回答问题#3:switchMap是一样的flatMapLatest,所以这就是为什么我只获得了最后一个。将代码更新为flatMap而不是switchMap。
在RxJs 5您必须.mergeMap(映射器(resultSelectorFunction更换.flatMapWithMaxConcurrent | NULL),并发)[mergeMap](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap) – user3717718