2017-02-10 106 views
2

我想监视何时使用observables将对象推到数组上。我想从一个空数组开始,当发生推时,我希望observable检测并处理它,然后等到下一次推。这与观察者等待事件的“fromEvent”非常相似。下面的代码立即调用completed(),因为数组是空的,我如何使它等待推送?请注意:输入机制不一定是数组。任何类型的消息队列对象都适用于我。rxjs观察数组推动

回答

2

你可以继承Array和实施某种形式的通知机制,当推碰巧告诉你(这是真的裸露的骨头):

class CustomArray extends Array { 
    push(e) { 
    super.push(e) 
    if (this._listeners) { 
     this._listeners.forEach(l => l(e)) 
    } 
    } 
    addPushListener(listener) { 
    this._listeners = this._listeners || [] 
    this._listeners.push(listener) 
    } 
    removePushListener(listener) { 
    if (this._listeners) { 
     const index = this._listeners.indexOf(listener) 
     if (index >= 0) { 
     this._listeners.splice(index, 1) 
     } 
    } 
    } 
} 

然后用功能,你可以包装成一个Observable

const observePushes = array => Rx.Observable.fromEventPattern(
    array.addPushListener.bind(array), 
    array.removePushListener.bind(array) 
) 

然后,您可以随时订阅更改并取消订阅,与其他可观察项目一样。

const arr = new CustomArray() 
const pushObservable = observePushes(arr) 

const subscription = pushObservable.subscribe(e => console.log(`Added ${e}`)) 

arr.push(1) 
arr.push(2) 
arr.push(3) 
arr.push("a") 

subscription.dispose() 

arr.push("b") 

还介意这个Observable从来没有真正完成,因为在时间没有一点可以保证,没有更多将被添加到阵列中。

小提琴:http://jsfiddle.net/u08daxdv/1/

2

如果你正在试图做的一切就是创建一个可观察,你可以“推”的值到,我建议使用RXJS主题。

const date$ = new Rx.Subject(); 
date$.next(new Date()); 

现在你有Date对象的可观察数据流,你可以“推”与next()方法。

如果您确实需要为您的队列设置中间(非可见)数据类型,那么我建议使用新的ES6功能代理。

const queue = new Proxy([], { 
    set: function(obj, prop, value) { 
    if (!isNaN(prop)) { 
     date$.next(value) 
    } 
    obj[prop] = value 
    return true 
    }, 
}) 

现在你有一个被代理的数组,这样任何时候增加一个值,它都会被添加到你的Observable流中。

+0

我应该补充的一件事是,在开始向其中推入值之前,您需要订阅主题,因为它会很“热”。 –

0

如果你想模仿一些异步/延迟,你可以做你的服务类像这样(装好的GIF格式!):

constructor() { 
    this.source = new Subject<Array<any>>() 
} 

myObservableArray(): Observable<Array<any>> { 
    setTimeout(() => { 
    this.source.next([1, 2, 3]) 
    }, 3000) 

    return this.source 
} 

..所以您可以稍后obj.myObservableArray().subscribe