2017-07-28 28 views
0

我想单声道的映射函数内部转换数据:如何将长时间运行的任务返回到地图内?

long result = 0.0; 

return Mono.just(result).map(value -> { 
    // do some long running transformation here 
    // and assign it to result (maybe 5 seconds task) 
    // in our case a request: 

    Mono<Result> resultObject = service.getResult(); 

    resultObject.subscribe(new Subscriber<Result>() { 
     @Override 
     public void onSubscribe(Subscription s) { 
      System.out.println("subscribe: " + System.currentTimeMillis()); 

      s.request(1); 
     } 

     @Override 
     public void onNext(Result result) { 
      System.out.println("on next: " + System.currentTimeMillis()); 

      value = result.getValue(); // this is not 0.0 
     } 

     @Override 
     public void onError(Throwable t) { 
      System.out.println("error " + t); 
     } 

     @Override 
     public void onComplete() { 
      System.out.println("complete"); 
     } 
    }); 

    return value; 
}); 

如果我把这叫做我总是0.0作为结果,因此map函数完成之前返回。对我来说这没有多大意义。在返回之前,我还应该如何改变我的结果?

编辑

我可以做到以下几点,但在我看来,这不是一个最佳的解决方案:

final CountDownLatch latch = new CountDownLatch(1); 
long result = 0.0; 

return Mono.just(result).map(value -> { 
    // do some long running transformation here 
    // and assign it to result (maybe 5 seconds task) 
    // in our case a request: 

    Mono<Result> resultObject = service.getResult(); 

    resultObject.subscribe(new Subscriber<Result>() { 
     @Override 
     public void onSubscribe(Subscription s) { 
      System.out.println("subscribe: " + System.currentTimeMillis()); 

      s.request(1); 
     } 

     @Override 
     public void onNext(Result result) { 
      System.out.println("on next: " + System.currentTimeMillis()); 

      value = result.getValue(); // this is not 0.0 

      latch.countDown(); 
     } 

     @Override 
     public void onError(Throwable t) { 
      System.out.println("error " + t); 
     } 

     @Override 
     public void onComplete() { 
      System.out.println("complete"); 
     } 
    }); 

    try { 
     latch.await(); 

     return value; 
    } catch(InterruptedException e) { 
     e.printStackTrace(); 

     return -1.0; 
    } 
}); 

回答

1

这听起来完全像什么flatMap是:如果你的长期运行的任务是异步并且可以表示为Publisher<T>,那么它可以由flatMap异步触发。

请注意,Mono#flatMap(Function)被称为Mono#then(Function)3.0.x

所以在3.0.7:

Mono.just(param).then(p -> service.getResult(p)); 

而且在3.1.0.M3:

Mono.just(param).flatMap(p -> service.getResult(p)); 

需要注意的是,如果你不使用的值(服务没有参数),那么你可能只需要提供延续Mono,使用then(Mono)(这是在双方3.0.x和3.1.X)有效:

Mono.just(paramThatGetsIgnored).then(service.getResult()); 

(但在这种情况下,Mono.just(...)的起点并不相关)

+0

请注意,从3.0.x'then'迁移到3.1.0'flatMap'的最佳策略是将'Mono .flatMap'与'Mono.flatMapMany'然后与'Mono.flatMap(Function)'一起使用'Mono.then(Function)'。没有一个'Function'的所有其他'then'变体保持不变。 –

相关问题