2014-10-20 30 views
4

我想用RxJs实现时间到期缓存。这里是例如“正常”的缓存:如何在RxJS中实现时间到期热点观察(或一般在无效扩展中)

//let this represents "heavy duty job" 
var data = Rx.Observable.return(Math.random() * 1000).delay(2000); 

//and we want to cache result 
var cachedData = new Rx.AsyncSubject(); 
data.subscribe(cachedData); 

cachedData.subscribe(function(data){ 
    //after 2 seconds, result is here and data is cached 
    //next subscribe returns immediately data 
    cachedData.subscribe(function(data2){ /*this is "instant"*/ }); 
}); 

cachedDatasubscribe被称为首次“重型作业”之称,并在2秒后的结果保存在cachedDataAsyncSubject)。 上的任何其他后续subscribe都会立即返回保存的结果(因此缓存实现)。

我想达到的目的是在cachedData内“调味”这段时间,这段时间内是有效的,当那段时间过去后,我想重新运行“重型工作”来获得新的数据和缓存这又为新的时间段,等等

期望的行为:

//pseudo code 
cachedData.youShouldExpireInXSeconds(10); 


//let's assume that all code is sequential from here 

//this is 1.st run 
cachedData.subscribe(function (data) { 
    //this first subscription actually runs "heavy duty job", and 
    //after 2 seconds first result data is here 
}); 

//this is 2.nd run, just after 1.st run finished 
cachedData.subscribe(function (data) { 
    //this result is cached 
}); 

//15 seconds later 
// cacheData should expired 
cachedData.subscribe(function (data) { 
    //i'm expecting same behaviour as it was 1.st run: 
    // - this runs new "heavy duty job" 
    // - and after 2 seconds we got new data result 
}); 


//.... 
//etc 

我是新来的Rx(JS),并无法弄清楚如何实施这个炎热的观察到有冷却时间。

回答

5

您只需要安排一项任务,在一段时间后用新的AsyncSubject代替cachedData。以下是如何做到这一点作为一个新Rx.Observable方法:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) { 
    var source = this, 
     cachedData = undefined; 

    // Use timeout scheduler if scheduler not supplied 
    scheduler = scheduler || Rx.Scheduler.timeout; 

    return Rx.Observable.create(function (observer) { 

     if (!cachedData) { 
      // The data is not cached. 
      // create a subject to hold the result 
      cachedData = new Rx.AsyncSubject(); 

      // subscribe to the query 
      source.subscribe(cachedData); 

      // when the query completes, start a timer which will expire the cache 
      cachedData.subscribe(function() { 
       scheduler.scheduleWithRelative(expirationMs, function() { 
        // clear the cache 
        cachedData = undefined; 
       }); 
      }); 
     } 

     // subscribe the observer to the cached data 
     return cachedData.subscribe(observer); 
    }); 
}; 

用法:

// a *cold* observable the issues a slow query each time it is subscribed 
var data = Rx.Observable.return(42).delay(5000); 

// the cached query 
var cachedData = data.cacheWithExpiration(15000); 

// first observer must wait 
cachedData.subscribe(); 

// wait 3 seconds 

// second observer gets result instantly 
cachedData.subscribe(); 

// wait 15 seconds 

// observer must wait again 
cachedData.subscribe(); 
+0

是啊,这是它。只是想知道“createWithDisposable”和“create”之间有什么区别,因为两者都很好。 – Tomo 2014-10-21 21:49:46

+2

在较早的版本中,'create'预计你会返回一个裸处理函数,'createWithDisposable'则期望你返回'Disposable'(一个带有'dispose'方法的对象)。看起来在最近的版本中,'create'已经足够聪明来处理这两种类型的返回值,现在没有什么区别。感谢这个问题,因为现在我可以停止使用'createWithDisposable'。我总是不喜欢它。 :) – Brandon 2014-10-22 02:01:55