2017-03-15 49 views
1

我的具体情况是,我想运行localforage和休息端点,看起来像之间的缓存种族:资源信令可观察到年底,但捕捉信号

  • 开始请求
  • 如果localforage在休息之前返回,向用户发送信号
  • 休息返回时,向用户发送信号
  • 如果在localforage之前返回了其他休息,则不发信号通知localforage响应。

这里的想法是,在大多数情况下,localforage会有一些可能非常快速返回的过期数据,但是当剩余端点请求完成时数据将被更新。我可以对这两个流进行合并,假设localforage将始终在其他请求之前返回(这是一个不合理的假设,但仍然是一个假设)。问题是,如果其他响应具有权威性并且出于某种原因比本地草稿响应更快,则在之后将会发出过时的缓存数据正确的休息响应。

所以我有点想在这些流上做一个.race(),但不是真的,因为如果streamA首先完成,我想要与streamB合并。如果streamB首先完成,我想取消streamA。

到目前为止,我已经打了takeUntilrace,而这种作品:

const streamA = Rx.Observable.fromFOO(); 
const streamB = Rx.Observable.fromBAR().publish(); 
const cacheRace = streamA.takeUntil(streamB).merge(streamB); 
cacheRace.subscribe(listener); 
streamB.connect(); 

我需要使用发布/连接东西,因为streamB否则实际上揭开序幕两个静止请求 - 一个用于takeUntil,一个用于合并。这很不令人满意。感觉像我缺少的操作符是takeUntilButNextOnce()或其他东西,或者是合并两个流但完成合并流的操作符,如果两个流中的一个完成(但是如果另一个完成则不完成)。

我可以定制的观测和/或主题,只是处理内部保持状态,而不是反复折腾封闭了范围,然后让我在这里使用普通的旧逻辑。

回答

1

结合一个简单的mergetakeUntil应该做的 - 也只是share其余的呼叫,应该照顾你提到的双重要求的问题。

const fromRest$ = myRestService.getDataFromServer().share(); 
const fromForage$ = ...; 
data$ = Observable.merge(
    fromRest$, 
    fromForage$.takeUntil(fromRest$) 
);