2017-03-10 67 views
0

我正在尝试反应堆库,我无法弄清楚为什么在单声道以下永远不会返回onNext或onComplete调用。我想我错过了非常平凡的事情。这是一个示例代码。无法接收onNext和onComplete订阅的单声道呼叫

MyServiceService service = new MyServiceService(); 
    service.save("id") 
      .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
      .subscribe(new Subscriber<MyUser>() { 
       @Override 
       public void onSubscribe(Subscription s) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onError(Throwable t) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 

       } 
      }); 


} 

private static class MyServiceService { 
    private Repository myRepo = new Repository(); 

    public Mono<MyUser> save(String userId) { 
     return myRepo.save(userId); 
    } 
} 

private static class Repository { 

    public Mono<MyUser> save(String userId) { 
     return Mono.create(myUserMonoSink -> { 
      Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId)); 
      ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit); 
      Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() { 
       @Override 
       public void onSuccess(MyUser result) { 
        myUserMonoSink.success(result); 
       } 

       @Override 
       public void onFailure(Throwable t) { 
        myUserMonoSink.error(t); 
       } 
      }); 
     }); 
    } 

    private MyUser blockingMethod(String userId) throws InterruptedException { 
     Thread.sleep(5000); 
     return new MyUser("blocking", userId); 
    } 
} 

以上代码仅打印Subcribed!main。什么我无法弄清楚是为什么在未来的回调不会通过myUserMonoSink.success

回答

2

重要的推动价值要记住的是,一个FluxMono异步,大部分的时间。

订阅后,保存用户的异步处理将从执行程序开始,但在.subscribe(...)后继续执行主代码。

因此,main线程退出,在任何内容被推送到Mono之前终止您的测试。

[侧栏]:什么时候有同步?

当数据来源是Flux/Mono同步工厂方法时。但增加的先决条件是运营商链的其余部分不会切换执行上下文。这可能会明确地发生(您使用publishOnsubscribeOn运算符)或隐式(某些运算符(如时间相关运算符),例如delayElements,运行在单独的Scheduler上)。

简而言之,您的源代码在exeExecutorService线程中运行,因此Mono确实是异步的。另一方面,您的片段在main上运行。

如何解决这个问题

观察实验的Mono正确的行为(而不是在生产中完全异步代码),有几个可能性:

  • 保持subscribe与system.out.printlns,但在onCompleteonError内部添加一个new CountDownLatch(1).countDown()await之后的倒计数锁存器subscribe
  • 使用.log().block()而不是.subscribe(...)。您将失去对每个事件做什么的定制,但log()将为您打印出来(如果您已配置了日志框架)。 block()将恢复到阻塞模式,并完成我上面提到的CountDownLatch。它返回一次可用的值,或者在出错时抛出一个Exception。的
  • 代替log()您可以自定义日志记录或使用.doOnXXX(...)方法等副作用(有一对几乎每一个类型的事件事件+的组合,如:doOnSubscribedoOnNext ...)

如果”重新进行单元测试,使用reactor-tests项目中的StepVerifier。当您致电.verify()时,它将订阅flux/mono并等待事件。请参阅参考指南chapter on testing(以及一般参考指南的其余部分)。

1

问题是在创建的匿名类onSubscribe方法中什么都不做。 如果你看看LambdaSubscriber的实现,它会请求一些事件。 因为它有一些预定义的逻辑,所以它更容易扩展BaseSubscriber

所以你的用户实现将是:

MyServiceService service = new MyServiceService(); 
service.save("id") 
     .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
     .subscribe(new BaseSubscriber<MyUser>() { 
       @Override 
       protected void hookOnSubscribe(Subscription subscription) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
        request(1); // or requestUnbounded(); 
       } 

       @Override 
       protected void hookOnNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

        // request(1); // if wasn't called requestUnbounded() 2 
       } 

       @Override 
       protected void hookOnComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 
       } 

       @Override 
       protected void hookOnError(Throwable throwable) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 
       } 

      }); 

也许它不是最好的实现,我是新来的太反应堆。

Simon的答案对测试异步代码有很好的解释。