2014-09-03 70 views
0

我有以下可观察链,A-> B,其中A在DB中创建一个条目,B将一个作业排入rabbitMQ。我想要做的是有第三个C运行,当且仅当B抛出一个异常(它必须删除/还原输入到数据库中的数据)。该异常应该传播给调用者。我正在使用onErrorResumeNext,但似乎工作,只有当我有一个明确的(空)订户。注意:下面代码中的委托返回A并且queueSender返回B.这是正确的,还是有更好的方法来做到这一点?如何使用Observable.onErrorResumeNext来执行分支并传播异常

public Observable<Long> create(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      Observable<Void> observableB = queueSender.observe(aLong) 

      observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() { 
       @Override 
       public Observable<Void> call(Throwable throwable) { 
        delegate.delete(aLong).toBlockingObservable().single(); 
        return null; 
       } 
      }).subscribe(new VoidSubscriber()); 

      return timelineObservable.map(new Func1<Void, Long>() { 
       @Override 
       public Long call(Void aVoid) { 
        return aLong; 
       } 
      }); 
     } 
    }); 
} 

回答

0

摆脱VoidSubscriber的简单方法是将observableB与您的timelineObservable一起压缩。由于onErrorResumeNext()会吞噬该错误,因此您还需要将onErrorResumeNext()更改为doOnError()以传播该错误。以下是修改后的代码:

public Observable<Long> createOld(final Message m) { 
    return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(final Long aLong) { 
      final Observable<Void> observableB = 
        queueSender 
          .observe(aLong) 
          .doOnError(
            new Action1<Throwable>() { 
             @Override 
             public void call(Throwable throwable) { 
              delegate.delete(aLong).toBlockingObservable().single(); 
             } 
            } 
          ); 

      final Observable<Long> observableTimeline = 
        timelineObservable 
          .map(new Func1<Void, Long>() { 
           @Override 
           public Long call(Void aVoid) { 
            return aLong; 
           } 
          }); 

      return Observable 
        .zip(
          observableB, 
          observableTimeline, 
          new Func2<Void, Long, Long>() { 
           @Override 
           public Long call(Void aVoid, Long aLong) { 
            return aLong; 
           } 
          } 
        ); 
     } 
    }); 
}