2016-11-11 90 views
3

我想要做什么,我认为是一个pausable buffer如何创建一个pausableBuffer W/rxjs 5

我有某人分享他们的代码,但我无法弄清楚如何把它变成一个定制。操作(没有打字稿/只ES6

const attach = Rx.Observable.timer(0 * 1000, 8 * 1000).mapTo('@'); 
const detach = Rx.Observable.timer(4 * 1000, 8 * 1000).mapTo('#'); 

const input = Rx.Observable.interval(1* 1000); 
const pauser = attach.mapTo(true).merge(detach.mapTo(false)); 

input 
    .publish(_input => _input 
    .combineLatest(pauser, (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)) 
    .concatMap(e => Rx.Observable.empty().delay(150).startWith(e)) 

有人可以帮助我创造这样我可以做input.pausableBuffer(pauser)(也许定义startsWith)

回答

4

你可以把它添加到原型是这样的:

var pausableBuffer = function(pauser) { 
    return this.publish(_input => _input 
    .combineLatest(pauser, (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)); 
} 

Rx.Observable.prototype.pausableBuffer = pausableBuffer; 

有一点要记住的是,这将从暂停状态开始。要在活动状态下启动它,请将.startWith(true)添加到pauser

var pausableBuffer = function(pauser) { 
    return this.publish(_input => _input 
    .combineLatest(pauser.startWith(true), (v, b) => b) 
    .filter(e => e) 
    .publish(_switch => _input.bufferWhen(() => _switch.take(1))) 
) 
    .flatMap(e => Rx.Observable.from(e)); 
} 

Rx.Observable.prototype.pausableBuffer = pausableBuffer;