A 线程不足死锁发生在普通线程池中,如果池中的所有线程正在等待同一个池中的排队任务完成。 ForkJoinPool
通过从join()
调用中的其他线程窃取工作而不是简单地等待来避免此问题。例如:我可以使用ForkJoinPool的工作窃取行为来避免线程匮乏死锁吗?
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
但是用ExecutorService
接口到ForkJoinPool
,工作窃取时,似乎没有发生。例如:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
从ForkJoinPool
的实施粗粗一看,所有的常规ExecutorService
API的使用ForkJoinTask
小号来实现,那么,为什么发生死锁我不知道。
我不认为偷工作可以避免死锁。一旦你陷入僵局,就无法取得进展。窃取工作只是通过允许线程在其队列为空时从其他队列中窃取来避免不平衡队列。 – markspace 2014-10-26 18:14:00
@markspace在'ForkJoinTask'的实现中,'join()'尝试从deque运行其他作业而不是拖延,这可以避免死锁。由于'ForkJoinPool.invokeAll()'将'Callable's转换为'ForkJoinTask's,我预计它也能工作。 – 2014-10-27 13:53:00