2011-03-09 51 views
1

我完全不熟悉jsr166y库,并且使用forkjoin库编写了一个例程,该库分割了一个查询并且同时运行它与数据库副本。我在下面放了一个片段。 SelectTask扩展了RecursiveTask。Java ForkJoin Future似乎过早完成

ForkJoinExecutor fjPool; 
    Future queryResultsFut = null; 
     for (int i = 1; i <= lastBatchNum; i++) { 

...

SelectTask selectMatchesRecursiveTask = new SelectMatchesTask(loadBalancer.getDao(), thisRuleBatch, queryResults); 
    queryResultsFut = fjPool.submit(selectMatchesRecursiveTask); 
} 

queryResultsFut.get(); 

的get方法的调用是为了阻止父线程,直到所有查询结果返回,这样处理可以在汇总结果开始。

在CI环境中运行一段时间后,我发现现在并不总是这样。当数据库较慢时,即使任务仍在运行,线程也会继续。这在我看来与我阅读的文档相矛盾。

也许我这样做是错误的方式?我应该扩展ForkJoinTask而不是RecursiveTask吗?

回答

3

你可能不应该使用ForkJoin。 FJ框架是专门为CPU密集型的非阻塞任务并行性设计的,但您专门用它来阻塞任务(外部数据库查询)。我建议你使用正常的执行者框架来执行你正在做的事情。

与您的问题相匹配的FJ的唯一方面是任务分解。尽管如此,通过简单的n路分割或更复杂的递归策略,手动操作不会太困难。

+0

感谢您的启发。您是否会知道执行程序框架是否包含将视为“假设所有已完成”方面的内容,这些方面我都假设为fork连接的设计目的? – barrymac 2011-03-10 11:38:03

+0

其实它看起来像我应该使用Phaser来实现带有ForkJoinPool的CyclicBarrier:http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/jsr166y/Phaser.html – barrymac 2011-03-10 11:50:08

+0

不知道为什么你想要等待它们全部完成,但是'ExecutorService.invokeAll(Collection >)'返回'List >'的结果。当你遍历它并获得每个将来阻塞的结果,直到未来完成为止,因此,在完成所有工作之前,不要完成迭代。如果你想以完成顺序得到结果,你可以使用'CompletionService'。 – 2011-03-10 23:00:43

1

RecursiveTask继承它从ForkJoinTask获得的功能,所以扩展ForkJoinTask不会有不同的效果。请记住,每次提交时都会得到不同的ForkJoinTask返回。你调用fjPool.submit多少次?如果你正在做的更多,那么一旦你将得到你提交的最后一项任务,并且queryResultsFut将在最后一项任务完成时完成(即从get中返回)。

由于您现在正在处理ForkJoin池,您应该在提交后返回ForkJoinTask而不是Future。 JF框架的主要目的是分治和处理。当你能够将问题分解成更小的类似问题时,它们是非常有用的,并行执行它们然后结合结果并返回。

+0

感谢在清除了。我曾假设该框架通过未来分享未来实例并使用它来等待所有分叉任务完成。 – barrymac 2011-03-10 11:15:58

+1

由于您正在使用RecursiveTask,因此我们的想法是递归提交到fork连接池。当你到达递归集合的末尾时,你会得到结果并将它与相同级别的相应分叉连接起来。您返回该结果并重复 – 2011-03-10 14:18:45

+2

但是,至少您要查找的内容考虑一个ExecutorCompletionService http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html – 2011-03-10 14:19:32