2017-08-24 76 views
1

摘要:我正在使用Rxjs和新手。我想实现这样一个可观察的情景,但迄今为止还没有运气。rxjs缓冲区,直到超时重新设置新的arg到达

有一个函数loadDetailsFromServer(itemIds),它调用服务器API并传递一些项目ID。这个功能被称为零星的。为了优化服务器调用,下面是我想要做的事情: 随着第一个函数调用的到来,超时被触发。如果超时之前有任何新的函数调用到达,timout将被重置为重新启动。当超时启动时,进行服务器调用,并且参数计数重置为零。

这里是一个大理石ISH图:

Timer is 4 clicks. 
INPUTS IN TIME  1-2---3-4-----5--------6-7-------- 
loadDetailsFromServer [1,2,3,4] -  [5]   -[6,7] 

function called with [1,2,3,4] because no more calls after 4 clicks. 

提示:这是类似的搜索框样本,并从服务器得到的结果,除了中间值感兴趣,而不是忽略/跳过。

回答

0

例如,如果你有源可观察到这样的:

const Rx = require('rxjs/Rx'); 
const Observable = Rx.Observable; 

const TIMEOUT = 1000; 

const source = Observable.range(1, 20) 
    .concatMap(v => Observable.of(v).delay(Math.random() * 2000)); 

然后你就可以使用scan缓冲它的价值。重置缓冲区我正在使用.merge(bufferNotifier.mapTo(null))。然后switchMap()我总是等待012ms发射1000ms。如果没有它的另一个观察的“覆盖”,因为新到达缓冲器:

const bufferNotifier = new Subject(); 

const chain = source 
    .do(undefined, undefined,() => bufferNotifier.complete()) // properly complete the chain 
    .merge(bufferNotifier.mapTo(null)) // reset buffer Subject 
    .scan((acc, val) => { 
     if (val === null) { 
      return []; 
     } 
     acc.push(val); 
     return acc; 
    }, []) 
    .filter(arr => arr.length > 0) 
    .switchMap(buffer => { // wait 1s until emitting the buffer further 
     return Observable.forkJoin(
      Observable.of(buffer), 
      Observable.timer(1000).take(1), 
      arr => arr 
     ); 
    }) 
    .do(() => bufferNotifier.next()) // trigger reset the buffer 
    .subscribe(console.log); 

此输出例如:

[ 1 ] 
[ 2 ] 
[ 3, 4 ] 
[ 5 ] 
[ 6, 7 ] 
[ 8, 9, 10, 11, 12 ] 
[ 13 ] 
[ 14, 15 ] 
[ 16 ] 
[ 17 ] 
[ 18 ] 
[ 19, 20 ] 
0

如果你也有类似的source可观察到马丁的回答,这样的事情可以工作:

source 
    .buffer(source.debounceTime(250)) 
    .subscribe(console.log); 

buffer收集所有发出的值,直到给定的观察到的发射。在这种情况下,它会等到debounceTime发出。 CodePen:https://codepen.io/anon/pen/PKBaZm?editors=1010