0
我有以下可观察链,A-> B,其中A在DB中创建一个条目,B将一个作业排入rabbitMQ。我想要做的是有第三个C运行,当且仅当B抛出一个异常(它必须删除/还原输入到数据库中的数据)。该异常应该传播给调用者。我正在使用onErrorResumeNext,但似乎工作,只有当我有一个明确的(空)订户。注意:下面代码中的委托返回A并且queueSender返回B.这是正确的,还是有更好的方法来做到这一点?如何使用Observable.onErrorResumeNext来执行分支并传播异常
public Observable<Long> create(final Message m) {
return delegate.create(m).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(final Long aLong) {
Observable<Void> observableB = queueSender.observe(aLong)
observableB.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
public Observable<Void> call(Throwable throwable) {
delegate.delete(aLong).toBlockingObservable().single();
return null;
}
}).subscribe(new VoidSubscriber());
return timelineObservable.map(new Func1<Void, Long>() {
@Override
public Long call(Void aVoid) {
return aLong;
}
});
}
});
}