2014-12-07 65 views
3

我试图实现对观测一个辅助方法返回一个新的观察到发射只值,直到达到超时:可观察takeUntil行为不端

implicit class ObservableOps[T](obs: Observable[T]) { 

    def timedOut(totalSec: Long): Observable[T] = { 
    require(totalSec >= 0) 
    val timeOut = Observable.interval(totalSec seconds) 
     .filter(_ > 0) 
     .take(1) 
    obs.takeUntil(timeOut) 
    } 
} 

我写了一个测试它,它创建一个可观察在超时后长时间发出第一个值。然而,得到的可观察似乎仍然包括已故值:

test("single value too late for timeout") { 
    val obs = Observable({Thread.sleep(8000); 1}) 
    val list = obs.timedOut(1).toBlockingObservable.toList 
    assert(list === List()) 
} 

试验失败的消息List(1) did not equal List()。我究竟做错了什么?

+0

您正在使用哪个版本的rx-java? – Dimitri 2014-12-07 21:47:03

+0

@Dimitri rxjava-scala-0.15.0我无法升级 – 2014-12-07 22:03:23

回答

2

我怀疑你的Thread.sleep(8000)实际上是阻止你的主线程。你在测试中是否尝试在val obs之后添加println,以便在测试开始后看到它是否正确显示?

这里发生的事情是,你的obs块8秒,你的程序申报,然后您创建新的使用timedOut,这样timedOut只要它被称为看到发射值观测。

使用rx-scala 0.23.0您的timedOut方法有效(除了Observable.interval不会立即发出,因此应该删除filter(_ > 0))。

val obs = Observable.just(42).delay(900.millis) 
val list = obs.timedOut(1).toBlocking.toList 
println(list) // prints List(42) 

val obs = Observable.just(42).delay(1100.millis) 
val list = obs.timedOut(1).toBlocking.toList 
println(list) // prints List()