我有一个数据流,具有快速传入的数据。我想通过保持顺序将它们插入到数据库中。我有一个数据库,它返回一个承诺,在插入成功时解决。RxJs缓冲区,直到数据库插入(承诺)
我想创建一个Rx流,缓冲新数据,直到缓冲数据被插入。
我该怎么做?
我有一个数据流,具有快速传入的数据。我想通过保持顺序将它们插入到数据库中。我有一个数据库,它返回一个承诺,在插入成功时解决。RxJs缓冲区,直到数据库插入(承诺)
我想创建一个Rx流,缓冲新数据,直到缓冲数据被插入。
我该怎么做?
我相信要得到你想要的东西,你需要创建自己的操作符。从RxJS打破略,你可以得到类似的信息(警告,没有测试)...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
然后可以用作
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
这不完全是纯粹的反应虽然有人可能能够想出更好的东西。
这是什么问题?有'buffer','bufferToggle'或'bufferWhen'运算符。 – martin
问题是我不知道如何使用它们。试图找出,但不知道如何。 –
使用'concatMap',从项目函数返回承诺。 'concatMap'会为你做缓冲,但是RxJS没有背压,所以如果你的数据到达的速度比你写的速度快,你会耗尽内存。 – cartant