我正在尝试反应堆库,我无法弄清楚为什么在单声道以下永远不会返回onNext或onComplete调用。我想我错过了非常平凡的事情。这是一个示例代码。无法接收onNext和onComplete订阅的单声道呼叫
MyServiceService service = new MyServiceService();
service.save("id")
.map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase()))
.subscribe(new Subscriber<MyUser>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("Subscribed!" + Thread.currentThread().getName());
}
@Override
public void onNext(MyUser myUser) {
System.out.println("OnNext on thread " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
System.out.println("onError!" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onCompleted!" + Thread.currentThread().getName());
}
});
}
private static class MyServiceService {
private Repository myRepo = new Repository();
public Mono<MyUser> save(String userId) {
return myRepo.save(userId);
}
}
private static class Repository {
public Mono<MyUser> save(String userId) {
return Mono.create(myUserMonoSink -> {
Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId));
ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit);
Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() {
@Override
public void onSuccess(MyUser result) {
myUserMonoSink.success(result);
}
@Override
public void onFailure(Throwable t) {
myUserMonoSink.error(t);
}
});
});
}
private MyUser blockingMethod(String userId) throws InterruptedException {
Thread.sleep(5000);
return new MyUser("blocking", userId);
}
}
以上代码仅打印Subcribed!main。什么我无法弄清楚是为什么在未来的回调不会通过myUserMonoSink.success