3
我试图实现对观测一个辅助方法返回一个新的观察到发射只值,直到达到超时:可观察takeUntil行为不端
implicit class ObservableOps[T](obs: Observable[T]) {
def timedOut(totalSec: Long): Observable[T] = {
require(totalSec >= 0)
val timeOut = Observable.interval(totalSec seconds)
.filter(_ > 0)
.take(1)
obs.takeUntil(timeOut)
}
}
我写了一个测试它,它创建一个可观察在超时后长时间发出第一个值。然而,得到的可观察似乎仍然包括已故值:
test("single value too late for timeout") {
val obs = Observable({Thread.sleep(8000); 1})
val list = obs.timedOut(1).toBlockingObservable.toList
assert(list === List())
}
试验失败的消息List(1) did not equal List()
。我究竟做错了什么?
您正在使用哪个版本的rx-java? – Dimitri 2014-12-07 21:47:03
@Dimitri rxjava-scala-0.15.0我无法升级 – 2014-12-07 22:03:23