2017-05-12 38 views
1

(在RxKotlin和RxJava工作,但使用元代码为简单起见)如何动态更新RX Observable?

许多Reactive Extensions指南首先从已有的数据创建一个Observable。从The introduction to Reactive Programming you've been missing,它是从一个字符串创建

var soureStream= Rx.Observable.just('https://api.github.com/users'); 

同样,从RxKotlin的头版,从填充的列表

val list = listOf(1,2,3,4,5) 
list.toObservable()  

现在考虑一个简单的过滤器,可以生成一个outStream

var outStream = sourceStream.filter({x > 3}) 

在这两个引导源事件的声明先验。这意味着事件的时间表有某种形式的

source: ----1,2,3,4,5------- 
out: --------------4,5--- 

如何修改sourceStream变得更加管道的?换句话说,在创建sourceStream期间没有输入数据可用?当源事件可用,则立即进行处理:

source: ---1--2--3-4---5------- 
out: ------------4---5------- 

我希望能够找到一个Observable.add()进行动态更新

var sourceStream = Observable.empty() 
var outStream = sourceStream.filter({x>3}) 

//print each element as its added 
sourceStream .subscribe({println(it)}) 
outStream.subscribe({println(it)}) 

for i in range(5): 
    sourceStream.add(i) 

这可能吗?

+1

我认为这就是所谓的一个PublishSubject。 – EpicPandaForce

+0

好吧,是的,这看起来像我所需要的。谢谢,,永远不会发现:) –

+1

你*可能*想*看看'PublishRelay'从https://github.com/JakeWharton/RxRelay/,因为这不会停止发布错误事件后的事件。 – EpicPandaForce

回答

1

我是新的,但我怎么能解决我的问题没有主题?如果我是 测试应用程序,并且我希望它每隔5秒钟“弹出”一次更新,除了此发布订阅 业务之外,还可以做其他事情吗?有人可以发布一个不涉及 涉及订阅者的问题吗?

如果你想弹出每五秒钟更新,然后创建一个可观察与interval操作,不使用主题。有几十个不同的操作符用于构建Observables,所以你很少需要一个主题。

这就是说,有时你确实需要一个,并且他们在测试代码时非常方便。我在单元测试中广泛使用它们。

To Use Subject Or Not To Use Subject?是和学科主题的优秀文章。

+0

谢谢,这看起来像我需要的东西。我也会阅读那篇文章 –