(在RxKotlin和RxJava工作,但使用元代码为简单起见)如何动态更新RX Observable?
许多Reactive Extensions
指南首先从已有的数据创建一个Observable
。从The introduction to Reactive Programming you've been missing,它是从一个字符串创建
var soureStream= Rx.Observable.just('https://api.github.com/users');
同样,从RxKotlin的头版,从填充的列表
val list = listOf(1,2,3,4,5)
list.toObservable()
现在考虑一个简单的过滤器,可以生成一个outStream
,
var outStream = sourceStream.filter({x > 3})
在这两个引导源事件的声明先验。这意味着事件的时间表有某种形式的
source: ----1,2,3,4,5-------
out: --------------4,5---
如何修改sourceStream
变得更加管道的?换句话说,在创建sourceStream
期间没有输入数据可用?当源事件可用,则立即进行处理:
source: ---1--2--3-4---5-------
out: ------------4---5-------
我希望能够找到一个Observable.add()
进行动态更新
var sourceStream = Observable.empty()
var outStream = sourceStream.filter({x>3})
//print each element as its added
sourceStream .subscribe({println(it)})
outStream.subscribe({println(it)})
for i in range(5):
sourceStream.add(i)
这可能吗?
我认为这就是所谓的一个PublishSubject。 – EpicPandaForce
好吧,是的,这看起来像我所需要的。谢谢,,永远不会发现:) –
你*可能*想*看看'PublishRelay'从https://github.com/JakeWharton/RxRelay/,因为这不会停止发布错误事件后的事件。 – EpicPandaForce