2017-10-20 95 views
0

假设我有3个服务。 首先我打电话serviceA,它返回一个CompletableFuture。 之后,结果我平常打电话serviceBserviceCthenCompose())。 当我得到所有结果后,我想结合所有3个结果并将它返回给某个调用者。 在主叫方,我想等到整个X millseconds的全过程,使:中断CompletableFuture默认值为

  • 如果我打断进程,同时serviceA通话过程中:抛出一些异常(所以它是强制性的)
  • 如果我中断了进程,而serviceBserviceC调用正在进行中:返回一些默认值(它们是可选的)。 这是我尝试使用的CompletableFuture

getNow(fallback)方法请查看下面我的代码片段,如果我在serviceBserviceC呼叫使用长期拖延的原因,我总是一个TimeoutException结束。 我该怎么做?

public CompletableFuture<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    return CompletableFuture.allOf(resultB, resultC) 
      .thenApply(ignoredVoid -> combine(
        resultA.join(), 
        resultB.getNow(fallbackB), 
        resultC.getNow(fallbackC)); 
} 

public Result extractFuture(CompletableFuture<Result> future) { 
    Result result; 
    try { 
     result = future.get(timeOut, MILLISECONDS); 
    } catch (ExecutionException ex) { 
     ... 
    } catch (InterruptedException | TimeoutException ex) { 
     // I always ends up here... 
    } 
    return result; 
} 

回答

2

通过.allOf(resultB, resultC)返回未来两,resultBresultC完成时才完成,因此,相关的功能ignoredVoid -> combine(resultA.join(), resultB.getNow(fallbackB), resultC.getNow(fallbackC)只会得到,如果resultBresultC完成,并提供一个备用没有任何效果可言评估。

在这些函数中调用get()通常是不可能的。考虑到在不同的时间和不同的超时时间,未来可能会有任意数量的调用,但是传递给thenApply的函数只能评估一次,这应该是显而易见的。

getFuture()处理消费者指定超时的唯一方法是改变它返回接收超时功能:

interface FutureFunc<R> { 
    R get(long time, TimeUnit u) throws ExecutionException; 
} 
public FutureFunc<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    CompletableFuture<Result> optimistic = CompletableFuture.allOf(resultB, resultC) 
     .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), resultC.join())); 
    return (t,u) -> { 
     try { 
      return optimistic.get(t, u); 
     } catch (InterruptedException | TimeoutException ex) { 
      return combine(resultA.join(), resultB.getNow(fallbackB), 
              resultC.getNow(fallbackC)); 
     } 
    }; 
} 

public Result extractFuture(FutureFunc<Result> future) { 
    Result result; 
    try { 
     result = future.get(timeOut, MILLISECONDS); 
    } catch (ExecutionException ex) { 
     ... 
    } 
    return result; 
} 

现在,随着不同的超时不同的呼叫可以被制成,可能有不同的结局只要B或C尚未完成。并不是说combine方法有些含糊不清,这也可能需要一些时间。

您可以将功能更改为

return (t,u) -> { 
    try { 
     if(resultB.isDone() && resultC.isDone()) return optimistic.get(); 
     return optimistic.get(t, u); 
    } catch (InterruptedException | TimeoutException ex) { 
     return combine(resultA.join(), resultB.getNow(fallbackB), 
             resultC.getNow(fallbackC)); 
    } 
}; 

等待的可能已经运行combine完成。在任何情况下,都不能保证结果在指定的时间内交付,因为即使使用B和C的回退值,也会执行combine,这可能需要一段任意时间。

如果你想取消样的行为,即所有结果查询返回相同的结果,即使它已被使用,由于第一个查询回退值来计算,你可以改用

public FutureFunc<Result> getFuture() { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC); 
    CompletableFuture<Result> result = bAndC 
     .thenApply(ignoredVoid -> combine(resultA.join(), resultB.join(), 
                  resultC.join())); 
    return (t,u) -> { 
     try { 
      bAndC.get(t, u); 
     } catch (InterruptedException|TimeoutException ex) { 
      resultB.complete(fallbackB); 
      resultC.complete(fallbackC); 
     } 
     try { 
      return result.get(); 
     } catch (InterruptedException ex) { 
      throw new ExecutionException(ex); 
     } 
    }; 
} 

有了这个,单个FutureFunc上的所有查询将始终返回相同的结果,即使它基于由于第一次超时而产生的回退值。该变体也一直排除从超时执行combine

当然,如果根本不打算使用不同的超时,则可以重构getFuture()以提前获得所需的超时,例如,作为参数。这将大大简化实施过程,并可能再次返回未来:

public CompletableFuture<Result> getFuture(long timeOut, TimeUnit u) { 
    CompletableFuture<A> resultA = serviceA.call(); 
    CompletableFuture<B> resultB = resultA.thenCompose(a -> serviceB.call(a)); 
    CompletableFuture<C> resultC = resultA.thenCompose(a -> serviceC.call(a)); 
    ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
    e.schedule(() -> resultB.complete(fallbackB), timeOut, u); 
    e.schedule(() -> resultC.complete(fallbackC), timeOut, u); 
    CompletableFuture<Void> bAndC = CompletableFuture.allOf(resultB, resultC); 
    bAndC.thenRun(e::shutdown); 
    return bAndC.thenApply(ignoredVoid -> 
          combine(resultA.join(), resultB.join(), resultC.join())); 
} 
+0

这正是我所期待的。它的功能就像一个魅力,感谢您的详细解答! – kornisb