2016-04-29 60 views
2

我想调用CompletableFuture.supplyAsync()将阻塞任务委托给另一个线程。一旦完成任务,我希望CompletableFuture.thenAccept使用者在调用线程的上下文中运行。从调用线程运行CompletableFuture.thenAccept?

例如:

// Thread 1 

CompletableFuture.supplyAsync(() -> { 
    // Thread 2 

    return BlockingMethod(); 
}).thenAccept((
    Object r) -> { 

    // Thread 1 
}); 

以下代码表明CompletableFuture.thenAccept运行在其自己的线程;可能是相同的池CompletableFuture.supplyAsync我得到相同的线程ID,当我运行它:

System.out.println("Sync thread supply " + Thread.currentThread().getId()); 

CompletableFuture.supplyAsync(() -> { 

    System.out.println("Async thread " + Thread.currentThread().getId()); 

    try { 
     Thread.sleep(2000); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 

    return true; 
}).thenAccept((
    Boolean r) -> { 

    System.out.println("Sync thread consume " + Thread.currentThread().getId()); 
}); 

Thread.sleep(3000); 

是否有可能同时有与调用线程CompletableFuture.thenAccept运行?

回答

3

CompletableFuture只会执行ConsumerthenAccept注册接收CompletableFuture(由supplyAsync返回一个)完成时,因为它需要它与完成值。

如果调用thenAccept时接收方CompletableFuture已完成,则Consumer将在调用线程中执行。否则,它将执行任何线程完成Supplier提交给supplyAsync

是否有可能同时有运行CompletableFuture.thenAccept与 调用线程?

这是一个令人困惑的问题,因为线程一次只能运行一件事。对于单个线程,没有并发同时是跨越多个线程的属性。

如果你想Consumer,直到将来完成了对CompletableFuture调用thenAccept,然后join在同一线程上运行,阻止该线程。然后您可以自己执行Consumer或致电thenAccept为您执行它。

例如

CompletableFuture<Boolean> receiver = CompletableFuture.supplyAsync(() -> { 
    System.out.println("Async thread " + Thread.currentThread().getId()); 

    try { 
     Thread.sleep(2000); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

    return true; 
}); 

receiver.join(); 
Consumer<Boolean> consumer = (Boolean r) -> { 
    System.out.println("Sync thread consume " + Thread.currentThread().getId()); 
}; 

consumer.accept(receiver.get()); 

(异常处理省略)


如果你想Consumer并行与供给supplyAsyncSupplier运行,这是不可能的。 Consumer意味着消耗Supplier产生的价值。该值在Supplier完成之前不可用。

0

如果我知道你要到餐​​桌的I/O密集型任务,你的想法的权利,但做的所有处理中的“事件循环”(认为JavaScript)的,那么你的代码可以被转换为

Executor eventLoop = Executors.newSingleThreadExecutor(); 
Executor ioMultiplexor = Executors.newCachedThreadPool(); 

eventLoop.execute(() -> { 
    System.out.println("event loop thread supply " + Thread.currentThread().getId()); 
    CompletableFuture.supplyAsync(() -> { 
     System.out.println("I/O multiplexor thread " + Thread.currentThread().getId()); 
     try { 
      Thread.sleep(2000); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     return true; 
    }, ioMultiplexor).thenAcceptAsync((Boolean r) -> { 
     System.out.println("event loop thread consume " + Thread.currentThread().getId()); 
    }, eventLoop); 
}); 

Thread.sleep(3000); 
// shut down executors 

这将打印

event loop thread supply 10 
I/O multiplexor thread 11 
event loop thread consume 10 

如果此代码用于一些请求处理,其中可以有很多并发请求,你可能会希望有一个全球eventLoop和一个全局ioMultiplexer,而你也将要释放的主请求韩德林g线程完成后将任务提交到eventLoop