2017-09-27 195 views
0

假设我想要并行发送多个数据库查询(或web服务请求),并在之后进行聚合。我会更好地使用stream API还是CompletableFuture使用Stream API或CompletableFuture进行并行db查询?

STREAM:

List<Result> result = requests.parallelStream() 
           .map(req -> query(req.getFirstname, req.getLastname)) 
           .collect(toList()); 

//a database, or a webservice 
private Result query(firstname, lastname); 

期货:

List<CompletableFuture> futures; 
for (QueryReq req: requests) { //given input 
    futures.add(CompletableFuture 
         .supplyAsync(() -> query(req.getFirstname, req.getLastname)); 
} 

//wait for all futures to complete and collect the results 
List<Result> results = new ArrayList<>(); 
for (CompleteableFuture f : futures) { 
    results.add(f.get()); 
} 

虽然流肯定是更简洁,但哪一个应首选是什么原因呢?

旁注:我知道我可以用sql = :firstname IN (..) and ... :lastname IN(..)更容易地查询这个例子。但这只是使用流或期货的一个例子。

该任务也可以并行发送多个webservice请求,而不是db查询。

+1

你会读这吗?它可能无法工作... https://stackoverflow.com/questions/44029856/using-jpa-objects-in-parallel-streams-with-spring – Eugene

+0

我会说广泛和基于意见。主要的牛肉是相同的(并行运行的东西),你的示例代码甚至都使用公共池。我想说这主要取决于其他代码的存在。 – Kayaman

+0

@Eugene当然取决于你在什么时候将EntityManager绑定到线程。给出的例子并未显示与链接问题类似的“线程交叉”。 – Kayaman

回答

1

正如您已经说过的:“流确实不那么冗长”,是不是更喜欢使用Stream?公平起见,我认为我们也应该用Java 8 Stream APIs重写第二个示例代码CompletableFuture

List<Result> result = requests.stream() 
     .map(req -> CompletableFuture.supplyAsync(() -> query(req.getFirstname, req.getLastname))) 
     .collect(toList()).stream() 
     .map(f -> f.get()).collect(toList()); 

看起来它仍然是很多冗长/长于:

List<Result> result = requests.parallelStream() 
     .map(req -> query(req.getFirstname, req.getLastname)).collect(toList()); 

不过,我认为这里的关键点是如何设置的并发线程数:通过并行流,线程数是固定的通过ForkJoinPool.commonPool,CPU核心号。通常这对于发送大量的web/db请求来说太小了。例如,如果有数十/数百个web/db请求发送,大多数情况下,发送具有20个或更多线程的请求的速度要快于ForkJoinPool.commonPool中定义的线程数。个人而言,我也不知道用什么方便的方式来指定并行流中的线程号。这里有一些你可以参考的答案:custom-thread-pool-in-java-8-parallel-stream

+0

所以我认为可以得出结论:如果只需要几个并行线程,就使用'streams', CompletableFuture'为更多数量的并行+来控制池大小。 – membersound

+0

没错。如果并行流提供的线程数足够。没有理由不使用并行数据流,毕竟这很容易/简单。不幸的是,CompletableFuture:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#runAsync-java.lang.Runnable-也使用ForkJoinPool.commonPool()。你需要找到我发布的问题的方式/答案。 –