RxJS新手问题即将到来! 所以我有这个基本的缓冲区,它将source1和source2中的所有内容附加到数组中。在某些情况下,缓冲区被清除。Rxjs缓冲区执行反压
var buffer = Rx.Observable.merge(source1, source2).scan(
function (arr, item) {
if (!magic) {
return arr.push(item);
}
else {
return [item]; //Clear the buffer from previous items
}
}, []);
我也希望有一个“消费者”的缓冲区,从缓冲区转移项目,并与他们做事情。我如何实现这一点,并确保消费者更新缓冲区可观察性?
编辑:我想将数据输入到SourceBuffer,但只允许在数据不更新时附加数据。这让我感到背压情况。所以我尝试创建一个controlled observable,但无法弄清楚如何使用自己的缓冲创建自己的版本。
请在问题中添加一些细节。 – tMJ