2017-12-18 215 views
4

警告:RxJS newb在这里。RxJS:如何将多个嵌套的观测数据与缓冲区结合使用

这里是我的挑战:

  1. onUnlink$观察到发射...
  2. 立即开始从onAdd$观察到的捕获值,最多1秒(我会打电话给此分区onAddBuffer$) 。
  3. 查询数据库(创建doc$观察到的)来获取我们将使用来匹配的onAdd$值之一
  4. 如果从onAddBuffer$观察到的一个值的doc$值相匹配的模式,不排放
  5. 如果没有值从onAddBuffer$观察到的doc$值匹配,或者如果onAddBuffer$观察到从来没有发出,发出doc$

这是我最好的猜测:

// for starters, concatMap doesn't seem right -- I want a whole new stream 
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => { 

    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })) 

    const onAddBuffer$ = onAdd$ 
    .buffer(doc$) // capture events while fetching from db -- not sure about this 
    .takeUntil(Rx.Observable.timer(1000)); 

    // if there is a match, emit nothing. otherwise wait 1 second and emit doc 
    return doc$.switchMap(doc => 
    Rx.Observable.race( 
     onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()), 
     Rx.Observable.timer(1000).mapTo(doc) 
    ) 
); 
}); 

docsToRemove$.subscribe(doc => { 
    // should only ever be invoked (with doc -- the doc$ value) 1 second 
    // after `onUnlink$` emits, when there are no matching `onAdd$` 
    // values within that 1 second window. 
}) 

这总是会发出EmptyObservable。也许这是因为single在没有匹配时似乎排出undefined,并且我预计它在没有匹配时根本不会发出? find发生同样的事情。

如果我将single更改为filter,则什么都不发出。

FYI:这是文件系统事件的重命名方案 - 如果add事件的unlink事件的1秒钟之内,接着给发出文件哈希匹配,什么也不做,因为它是一个rename。否则,它是一个真正的unlink,它应该发出要删除的数据库文档。

+0

这听起来像你在这里构建了一个非常讨厌的竞赛条件。超时通常不是解决这个问题的好方法 - 如果事情由于某种原因需要更长的时间,则会丢失数据。 –

+1

是的,这里肯定有潜在的竞争条件。它最终可能会挫败这种做法。但它似乎是学习rxjs的好机会。 – glortho

回答

3

这是我的猜测,你怎么可以这样做:

onUnlink$.concatMap(unlinkValue => { 
    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share(); 
    const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$); 
    const onAddBuffer$ = onAdd$.buffer(bufferDuration$); 

    return Observable.forkJoin(onAddBuffer$, doc$) 
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ }); 
}); 

single()运营商是有点棘手,因为它发出的源可观察完成仅后的谓语功能相匹配的项目(或发出有两个项目或没有匹配项目时出错)。

race()也很棘手。如果其中一个源Observable完成并且没有发出任何值race()将刚刚完成并且不会发出任何东西。我前一段时间曾经报道过,这是正确的行为,请参阅https://github.com/ReactiveX/rxjs/issues/2641
我想这是你的代码出了什么问题。

另请注意,.mapTo(Rx.Observable.empty())会将每个值映射到Observable的实例。如果您想忽略所有值,则可以使用filter(() => false)ignoreElements()运算符。