我有以下类:呼叫对象的发射另一个改造呼叫
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
在活动我观察了会议,并在每个变化进行网络操作:
private Subscription init() {
return sessionStore
.observe()
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return (session.isValid()
? retrofitService.getThingForValid()
: retrofitService.getThingForInalid())
.subscribeOn(Schedulers.io());
}
})
.subscribe(...);
}
现在我有一个Okhttp请求拦截器,当网络响应是非200代码时,我将会话实例从有效设置为无效。
这是发生了什么:
- 在初始订阅会话店
getThingForValid()
执行和失败。 - OkHttp拦截失败并设置新会话。
- 会话存储会发出新的,现在无效的会话。
- 新排放执行
getThingForInvalid()
方法。
重要的是要知道这个执行发生在前一次Retrofit调用中。这是因为OkHttp客户端被Retrofit包装,并且所有拦截器都在Retrofit返回之前执行。
考虑到这一点,您意识到第二个调用正在被Retrofit执行和处理,而第一个调用还没有完成。
- 第一次调用完成后,它会抛出HttpException,因为响应是非200代码。
- xception杀死了rx流并使用它进行第二次调用。
我试图忽略流中的这个异常,但第二次调用通过Retrofit取消。
你有什么想法如何使我的概念工作,请?
你有没有尝试过'.onErrorResumeNext(err - > Observable.empty())'''静脉? –
是的,它有相同的输出。它必须与改造内部结构有关,而不是与RxJava结合。 – bakua
你能抽象地解释一下,你试图达到什么样的目标以及你原来的问题是什么? –