2017-04-25 150 views
0

我想实现Redux模式来管理Jake Wharton演示后的状态:http://jakewharton.com/the-state-of-managing-state-with-rxjava/ 我希望流中的所有内容都在后台线程上运行并接收AndroidMainThread上的输出。但是通过这个当前的设置,我的Subscriber引发了一个异常,我正在AndroidMainThread之外的另一个线程上操作UI。提前致谢。管理Rxjava上的线程

Observable events = Observable.merge(Observable.just(new GetUsersEvent()/*, other event streams*/)); 
     events.compose(mergeEvents(GetUsersEvent.class, /* other events */)) 
       .compose(events -> events.map(event -> { 
        BaseAction action = null; 
        if (event instanceof GetUsersEvent) 
         action = new GetUsersAction(); 
        return action; 
       }) 
         .compose(actions -> actions.flatMap(action -> Observable.just(action) 
           .flatMap(action -> { 
            Observable result = Observable.empty(); 
            if (action instanceof GetUsersAction) 
             result = userListVM.getUsers() 
               .subscribeOn(AndroidSchedulers.from(handlerThread.getLooper())); // I am using a handler thread to receive live updates from realm 
            return result; 
           }) 
           .map(Result::successResult) 
           .onErrorReturn(Result::errorResult) 
           .startWith(Result.IN_FLIGHT))) 
         .scan(initialState, (currentUIModel, result) -> { 
          if (result.isLoading()) 
           currentUIModel = UIModel.loadingState(bundle); 
          else if (result.isSuccessful()) 
           currentUIModel = UIModel.successState(result); 
          else currentUIModel = UIModel.errorState(result.getError()); 
          return currentUIModel; 
         }) 
         .observeOn(AndroidSchedulers.mainThread())) 
       .subscribe(o -> {/* update UI */}, OnErrorNotImplementedException::new); 

mergeEvents变压器:

public Observable.Transformer<BaseEvent, BaseEvent> mergeEvents(Class... classes) { 
     return events -> events.subscribeOn(Schedulers.io()) 
       .publish(shared -> { 
        List<Class> classList = Arrays.asList(classes); 
        for (int i = 0, size = classList.size(); i < size; i++) 
         shared = shared.mergeWith(shared.ofType(classList.get(i))); 
        return shared; 
       }); 
    } 

堆栈跟踪:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. 
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112) 
     at android.os.Handler.handleCallback(Handler.java:746) 
     at android.os.Handler.dispatchMessage(Handler.java:95) 
     at android.os.Looper.loop(Looper.java:148) 
     at android.app.ActivityThread.main(ActivityThread.java:5443) 
     at java.lang.reflect.Method.invoke(Native Method) 
     at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728) 
     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618) 
    Caused by: rx.exceptions.OnErrorNotImplementedException: Expected to be called on the main thread but was RxIoScheduler-2 
     at com.zeyad.usecases.app.components.mvvm.BaseSubscriber.onError(BaseSubscriber.java:36) 
     at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153) 
     at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115) 
     at rx.observers.SerializedObserver.onError(SerializedObserver.java:152) 
     at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78) 
     at rx.internal.operators.OperatorTakeUntil$1.onError(OperatorTakeUntil.java:50) 
     at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:273) 
     at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216) 
     at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) 
     at android.os.Handler.handleCallback(Handler.java:746)  
     at android.os.Handler.dispatchMessage(Handler.java:95)  
     at android.os.Looper.loop(Looper.java:148)  
     at android.app.ActivityThread.main(ActivityThread.java:5443)  
     at java.lang.reflect.Method.invoke(Native Method)  
     at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728)  
     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)  
    Caused by: java.lang.IllegalStateException: Expected to be called on the main thread but was RxIoScheduler-2 
     at rx.android.MainThreadSubscription.verifyMainThread(MainThreadSubscription.java:58) 
     at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:19) 
     at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:10) 
     at rx.Observable.unsafeSubscribe(Observable.java:10346) 
     at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) 
     at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) 
     at rx.Observable.unsafeSubscribe(Observable.java:10346) 
     at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45) 
     at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) 
     at rx.Observable.unsafeSubscribe(Observable.java:10346) 
     at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) 
     at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) 
     at rx.Observable.unsafeSubscribe(Observable.java:10346) 
     at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51) 
     at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) 
     at rx.Observable.unsafeSubscribe(Observable.java:10346) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248) 
     at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) 
     at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.fastPath(OnSubscribeFromArray.java:76) 
     at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:58) 
     at rx.Subscriber.setProducer(Subscriber.java:211) 
     at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32) 
     at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24) 
     at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) 
    at rx.internal.operators.OnSubscribeLift.call(OnSubscr 
+0

你有Stacktrace吗?难道是链中的一个运营商试图修改一些UI状态?我没有看到任何UI值更改,并且observeOn Android.mainThread应该在通过订阅访问UI之前将线程切换到UI。 scheduleOn做什么? –

+0

我添加了堆栈跟踪,对我来说帮助不大。非操作员只在用户中触摸UI元素。我在scheduleOn中固定了这个类型。 –

回答

2

我想流中的一切在后台线程上运行,并收到关于AndroidMainThread

输出

因为您的事件源是必须在主线程中注册的UI事件,所以不能在后台线程的流中包含所有内容。

你正在应用IO调度器在mergeEvents转换器,这意味着所有的UI事件将在IO线程订阅。

你可以在日志中看到的来源:

Caused by: java.lang.IllegalStateException: Expected to be called on the main thread but was RxIoScheduler-2 
    at rx.android.MainThreadSubscription.verifyMainThread(MainThreadSubscription.java:58) 
    at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:19) 
    at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:10) 

您试图订阅RecyclerView滚动事件不是在主线程。
您可以通过在mainThread中订阅mergeEvents,然后将observeOn放在您选择的某个bg线程中来解决该问题,因此该流的其余部分将在UI线程中处理。