2017-10-19 166 views
2

我有一个数据流,具有快速传入的数据。我想通过保持顺序将它们插入到数据库中。我有一个数据库,它返回一个承诺,在插入成功时解决。RxJs缓冲区,直到数据库插入(承诺)

我想创建一个Rx流,缓冲新数据,直到缓冲数据被插入。

我该怎么做?

+0

这是什么问题?有'buffer','bufferToggle'或'bufferWhen'运算符。 – martin

+0

问题是我不知道如何使用它们。试图找出,但不知道如何。 –

+1

使用'concatMap',从项目函数返回承诺。 'concatMap'会为你做缓冲,但是RxJS没有背压,所以如果你的数据到达的速度比你写的速度快,你会耗尽内存。 – cartant

回答

2

我相信要得到你想要的东西,你需要创建自己的操作符。从RxJS打破略,你可以得到类似的信息(警告,没有测试)...

export class BusyBuffer<T> { 
    private itemQueue = new Subject<T>(); 
    private bufferTrigger = new Subject<{}>(); 
    private busy = false; 

    constructor(consumerCallback: (items: T[]) => Promise<void>) { 
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => { 
     this.busy = true; 
     consumerCallback(items).then(() => { 
     this.busy = false; 
     this.bufferTrigger.next(null); 
     }); 
    }); 
    } 

    submitItem(item: T) { 
    this.itemQueue.next(item); 
    if(!busy) { 
     this.bufferTrigger.next(null); 
    } 
    } 

} 

然后可以用作

let busyBuffer = new BusyBuffer<T>(items => { 
    return database.insertRecords(items); 
}); 
items.subscribe(item => busyBuffer.submitItem(item)); 

这不完全是纯粹的反应虽然有人可能能够想出更好的东西。

+2

谢谢!我创造了同样的,但我希望有人能拿出一个纯粹的反应解决方案:) –

+0

没有汗水,祝你好运。我正在考虑从数据库中获取某种繁忙/免费的信号,并将其传回缓冲区方法,但如果数据库没有做任何事情,您还必须添加缓冲区应该立即发出的逻辑。 – Pace

+0

如果你想要更好的东西,不要接受我的答案。我不介意。 – Pace