2017-03-10 32 views
1

我有一个使用RxJava observables设置的事件序列。基本上我合并了Observable.just(Events.*)创建的不同延迟与observable.delay(time, timeUnit, scheduler)函数设置的不同延迟。然后我将它们发布到PublishSubjectevents,代码如下),并订阅该PublishSubject以观察序列(observeEvents()函数在下面的代码中)。它曾经工作得很好,但最近我在我的设备上看到了一个非常奇怪的行为(OnePlus One with android 5.0.2)(并且没有在模拟器上看到它)。基本上,事件会混淆不清,延迟较高的事件可能会在延迟较小的事件之前发生,延迟较小的事件可能会在队列末尾发生,有时所有事件都可能按正确的顺序排列。前3场比赛经常混合在一起。有时候根本没有观察到一些事件。这里会发生什么?RxJava延迟可观察到的火灾意外延迟

的代码在科特林:

var computationScheduler = Schedulers.computation() 

private val events: PublishSubject<Events> = PublishSubject.create() 
private val userActionSubject: PublishSubject<Events> = PublishSubject.create() 

Observable.merge(
      event0(), 
      event1(), 
      event2(), 
      userActionOrEvent3(), 
      userActionOrEvent4()) 
      .subscribe({ 
       // Weird timings are observed here already 
       events.onNext(it) 
      }, { e -> 
       events.onError(e) 
      })) 

private fun userActionOrEvent4(): Observable<Events> { 
    return Observable.amb(Observable.just(Events.Event4) 
      .delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) 
      .take(1) 
} 

private fun userActionOrEvent3(): Observable<Events> { 
    return Observable.amb(Observable.just(Events.Event3) 
      .delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) 
      .take(1) 
} 

private fun event2() = Observable.just(Events.Event2) 
     .delay(1800, TimeUnit.MILLISECONDS, computationScheduler) 

private fun event1() = Observable.just(Events.Event1) 
     .delay(200, TimeUnit.MILLISECONDS, computationScheduler) 

private fun event0() = Observable.just(Events.Event0) 
     .subscribeOn(computationScheduler) 

open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread()) 

open fun onUserAction() { 
    userActionSubject.onNext(Events.Action) 
} 

回答

0

原来,这问题是由计算调度员造成的,当我将Schedulers.computation()更改为Schedulers.newThread()事件在预期时间开始启动。