2015-09-04 72 views
2

RxJS新手问题即将到来! 所以我有这个基本的缓冲区,它将source1和source2中的所有内容附加到数组中。在某些情况下,缓冲区被清除。Rxjs缓冲区执行反压

var buffer = Rx.Observable.merge(source1, source2).scan(
    function (arr, item) { 
    if (!magic) { 
     return arr.push(item); 
    } 
    else { 
     return [item]; //Clear the buffer from previous items 
    } 
    }, []); 

我也希望有一个“消费者”的缓冲区,从缓冲区转移项目,并与他们做事情。我如何实现这一点,并确保消费者更新缓冲区可观察性?

编辑:我想将数据输入到SourceBuffer,但只允许在数据不更新时附加数据。这让我感到背压情况。所以我尝试创建一个controlled observable,但无法弄清楚如何使用自己的缓冲创建自己的版本。

+0

请在问题中添加一些细节。 – tMJ

回答

2

所以我有这个基本的缓冲区,它将源码1和源码2中的所有内容附加到数组中。在某些情况下,缓冲区被清除。

你需要做的是:

var sourceStream = Rx.Observable.merge(soruce1, source2); 

var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3); 
// someOperator is where you perform the "magic" 

var subscribeToThisStream = sourceStream.buffer(boundary); 
// emits all items collected in the buffer between two boundary emitions 

我也希望有一个缓冲区,从缓冲区他们转移项目和做事的“消费者”。我如何实现这一点,并确保消费者更新缓冲区可观察性?

如果你想做到这一点的用户的手段,这是绝对不可取的,如果你想要做的事情在Rx方式在某些情况下甚至是不可能的。

+0

只是一个愚蠢的问题 - 什么是'lift'的签名?我无法找到它,但我很肯定我曾经在讨论RxJS的内部实现时看到过这种方法。那么它是怎样工作的? –

+0

感谢您的回答tMJ,我更新了我的问题,并提供了更多详细信息 – oskbor

+0

@oskbor我无法确定您是否已经得到答案,或者您对某些事情还不清楚。让我知道 – tMJ