2015-02-06 79 views
3

我有以下代码(*),它使用递归调用提供的observable的调度程序来实现轮询。RxJava:调用onError而不完成/取消订阅

(*)从https://github.com/ReactiveX/RxJava/issues/448

这是正常工作的启发,当我只有通过onNext事件给用户。但是,当我将onError事件传递给订阅者时,会调用取消订阅事件,这又会杀死调度程序。

我想也将错误传递给订阅者。任何想法如何实现?

public Observable<Status> observe() { 
    return Observable.create(new PollingSubscriberAction<>(service.getStatusObservable(), 5, TimeUnit.SECONDS)); 
} 

private class PollingSubscriberAction<T> implements Observable.OnSubscribe<T> { 
    private Subscription subscription; 
    private Subscription innerSubscription; 
    private Scheduler.Worker worker = Schedulers.newThread().createWorker(); 

    private Observable<T> observable; 
    private long delayTime; 
    private TimeUnit unit; 

    public PollingSubscriberAction(final Observable<T> observable, long delayTime, TimeUnit unit) { 
     this.observable = observable; 
     this.delayTime = delayTime; 
     this.unit = unit; 
    } 

    @Override 
    public void call(final Subscriber<? super T> subscriber) { 
     subscription = worker.schedule(new Action0() { 
      @Override 
      public void call() { 
       schedule(subscriber, true); 
      } 
     }); 

     subscriber.add(Subscriptions.create(new Action0() { 
      @Override 
      public void call() { 
       subscription.unsubscribe(); 
       if (innerSubscription != null) { 
        innerSubscription.unsubscribe(); 
       } 
      } 
     })); 
    } 

    private void schedule(final Subscriber<? super T> subscriber, boolean immediately) { 
     long delayTime = immediately ? 0 : this.delayTime; 
     subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit); 
    } 

    private Action0 createInnerAction(final Subscriber<? super T> subscriber) { 
     return new Action0() { 
      @Override 
      public void call() { 
       innerSubscription = observable.subscribe(new Observer<T>() { 
        @Override 
        public void onCompleted() { 
         schedule(subscriber, false); 
        } 

        @Override 
        public void onError(Throwable e) { 
         // Doesn't work. 
         // subscriber.onError(e); 
         schedule(subscriber, false); 
        } 

        @Override 
        public void onNext(T t) { 
         subscriber.onNext(t); 
        } 
       }); 
      } 
     }; 
    } 
} 

回答

3

所以我一直玩这个一段时间,我不认为这是可能的方式,你这样做。调用onErroronCompleted可以终止流,翻转SafeSubscriber包装中的done标志,并且没有办法重置它。

我可以看到2个选项可用 - 我认为不是特别优雅,但会工作。

1 - UnsafeSubscribe。可能不是最好的主意,但它的工作原理,因为它不是将Subscriber包装在SafeSubscriber中,而是直接调用它。最好阅读Javadoc,看看这对你是否合适。或者,如果您感觉冒险,请编写您自己的SafeSubscriber,您可以在其中重置完成标志或类似信息。以您为例,请拨打电话:

observe.unsafeSubscribe(...) 

2 - 实施类似于this example的操作。我很感激它在C#中,但它应该是可读的。简单地说 - 你想创建一个Pair<T, Exception>类,然后,而不是调用onError,请致电onNext并设置您的配对的异常端。您的用户必须更聪明地检查这一对的每一边,并且您可能需要在源ObservableObservable<Pair<T, Exception>>之间进行一些数据转换,但我不明白为什么它不起作用。

如果有人有这样做,我会真的很有兴趣看到另一种做法。

希望这有助于

+0

你可以举例unsafeSubscribe()吗?一个简单的方法调用并不能解决问题。 – Alexandr 2016-01-30 06:18:29

+0

@Alexandr这个答案已经过去了一段时间,自那以后API已经发生了很大的变化,所以这可能不再有效。此外,由OP记录的解决方案可能更好。如果问题是相同的,我会使用该解决方案,或发布一个新问题。 – Will 2016-01-30 07:19:31

3

由于@Will指出,你不能直接调用onError而不终止的观察到。由于您只能拨打onNext,因此我决定使用Notification将值和throwable包装在一个对象中。

import rx.*; 
import rx.functions.Action0; 
import rx.schedulers.Schedulers; 
import rx.subscriptions.Subscriptions; 

import java.util.concurrent.TimeUnit; 

public class PollingObservable { 
    public static <T> Observable<Notification<T>> create(Observable<T> observable, long delayTime, TimeUnit unit) { 
     return Observable.create(new OnSubscribePolling<>(observable, delayTime, unit)); 
    } 

    private static class OnSubscribePolling<T> implements Observable.OnSubscribe<Notification<T>> { 
     private Subscription subscription; 
     private Subscription innerSubscription; 
     private Scheduler.Worker worker = Schedulers.newThread().createWorker(); 

     private Observable<T> observable; 
     private long delayTime; 
     private TimeUnit unit; 

     private boolean isUnsubscribed = false; 

     public OnSubscribePolling(final Observable<T> observable, long delayTime, TimeUnit unit) { 
      this.observable = observable; 
      this.delayTime = delayTime; 
      this.unit = unit; 
     } 

     @Override 
     public void call(final Subscriber<? super Notification<T>> subscriber) { 
      subscription = worker.schedule(new Action0() { 
       @Override 
       public void call() { 
        schedule(subscriber, true); 
       } 
      }); 

      subscriber.onStart(); 
      subscriber.add(Subscriptions.create(new Action0() { 
       @Override 
       public void call() { 
        isUnsubscribed = true; 

        subscription.unsubscribe(); 
        if (innerSubscription != null) { 
         innerSubscription.unsubscribe(); 
        } 
       } 
      })); 
     } 

     private void schedule(final Subscriber<? super Notification<T>> subscriber, boolean immediately) { 
      if (isUnsubscribed) { 
       return; 
      } 

      long delayTime = immediately ? 0 : this.delayTime; 
      subscription = worker.schedule(createInnerAction(subscriber), delayTime, unit); 
     } 

     private Action0 createInnerAction(final Subscriber<? super Notification<T>> subscriber) { 
      return new Action0() { 
       @Override 
       public void call() { 
        innerSubscription = observable.subscribe(new Observer<T>() { 
         @Override 
         public void onCompleted() { 
          schedule(subscriber, false); 
         } 

         @Override 
         public void onError(Throwable e) { 
          subscriber.onNext(Notification.<T>createOnError(e)); 
          schedule(subscriber, false); 
         } 

         @Override 
         public void onNext(T t) { 
          subscriber.onNext(Notification.createOnNext(t)); 
         } 
        }); 
       } 
      }; 
     } 
    } 
} 

要使用此功能,您可以直接使用的通知:

PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS) 
    .subscribe(new Action1<Notification<Status>>() { 
     @Override 
     public void call(Notification<Status> notification) { 
      switch (notification.getKind()) { 
       case OnNext: 
        Status status = notification.getValue(); 
        // handle onNext event 
        break; 
       case OnError: 
        Throwable throwable = notification.getThrowable(); 
        // handle onError event 
        break; 
      } 
     } 
    }); 

或者您可以使用通知上的accept方法使用常规的可观察:

PollingObservable.create(service.getStatus(), 5, TimeUnit.SECONDS) 
     .subscribe(new Action1<Notification<Status>>() { 
      @Override 
      public void call(Notification<Status> notification) { 
       notification.accept(statusObserver); 
      } 
     }); 

Observer<Status> statusObserver = new Observer<Status>() { 
    // ... 
} 

UPDATE 2015-02-24

看来轮询可观察者有时不能正常工作,因为内部可观察者即使在取消订阅后也会调用onCompleteonError,从而重新安排自己。我添加了isUnsubscribed标志来防止发生这种情况。

+0

真的很喜欢使用通知类 - 比使用Pair更清洁 – Will 2015-02-13 09:45:57