11

A 线程不足死锁发生在普通线程池中,如果池中的所有线程正在等待同一个池中的排队任务完成。 ForkJoinPool通过从join()调用中的其他线程窃取工作而不是简单地等待来避免此问题。例如:我可以使用ForkJoinPool的工作窃取行为来避免线程匮乏死锁吗?

private static class ForkableTask extends RecursiveTask<Integer> { 
    private final CyclicBarrier barrier; 

    ForkableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    protected Integer compute() { 
     try { 
      barrier.await(); 
      return 1; 
     } catch (InterruptedException | BrokenBarrierException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 

@Test 
public void testForkJoinPool() throws Exception { 
    final int parallelism = 4; 
    final ForkJoinPool pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism); 
    for (int i = 0; i < parallelism; ++i) { 
     forkableTasks.add(new ForkableTask(barrier)); 
    } 

    int result = pool.invoke(new RecursiveTask<Integer>() { 
     @Override 
     protected Integer compute() { 
      for (ForkableTask task : forkableTasks) { 
       task.fork(); 
      } 

      int result = 0; 
      for (ForkableTask task : forkableTasks) { 
       result += task.join(); 
      } 
      return result; 
     } 
    }); 
    assertThat(result, equalTo(parallelism)); 
} 

但是用ExecutorService接口到ForkJoinPool,工作窃取时,似乎没有发生。例如:

private static class CallableTask implements Callable<Integer> { 
    private final CyclicBarrier barrier; 

    CallableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    public Integer call() throws Exception { 
     barrier.await(); 
     return 1; 
    } 
} 

@Test 
public void testWorkStealing() throws Exception { 
    final int parallelism = 4; 
    final ExecutorService pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier)); 
    int result = pool.submit(new Callable<Integer>() { 
     @Override 
     public Integer call() throws Exception { 
      int result = 0; 
      // Deadlock in invokeAll(), rather than stealing work 
      for (Future<Integer> future : pool.invokeAll(callableTasks)) { 
       result += future.get(); 
      } 
      return result; 
     } 
    }).get(); 
    assertThat(result, equalTo(parallelism)); 
} 

ForkJoinPool的实施粗粗一看,所有的常规ExecutorService API的使用ForkJoinTask小号来实现,那么,为什么发生死锁我不知道。

+2

我不认为偷工作可以避免死锁。一旦你陷入僵局,就无法取得进展。窃取工作只是通过允许线程在其队列为空时从其他队列中窃取来避免不平衡队列。 – markspace 2014-10-26 18:14:00

+0

@markspace在'ForkJoinTask'的实现中,'join()'尝试从deque运行其他作业而不是拖延,这可以避免死锁。由于'ForkJoinPool.invokeAll()'将'Callable's转换为'ForkJoinTask's,我预计它也能工作。 – 2014-10-27 13:53:00

回答

20

你差不多回答你自己的问题。解决方法是“ForkJoinPool通过从join()调用内部的其他线程窃取工作来避免此问题”。每当线程由于除ForkJoinPool.join()之外的其他原因而被阻塞时,不会发生此工作窃取,并且线程仅等待并且什么都不做。

原因是在Java中,ForkJoinPool不可能阻止其线程阻塞,而是给他们另外的工作。线程本身需要避免阻塞,而是要求池应该做的工作。这只在ForkJoinTask.join()方法中实现,而不是在其他任何阻塞方法中实现。如果在ForkJoinPool中使用Future,则还会看到饥饿死锁。

为什么工作窃取仅在ForkJoinTask.join()中实现,而不是在Java API中的任何其他阻止方法中实现?那么,有很多这样的阻塞方法(Object.wait(),Future.get()java.util.concurrent中的任何并发原语,I/O方法等),它们与ForkJoinPool无关,它只是API中的一个任意类,所以添加所有这些方法的特例都是不好的设计。这也会导致可能非常令人惊讶和不期望的影响。想象一下,例如,用户将任务传递给ExecutorService,等待Future,然后发现该任务在Future.get()中挂起的时间很长,这是因为正在运行的线程偷了其他一些(长时间运行的)工作项而不是等待Future并在结果可用后立即继续。一旦一个线程开始处理另一个任务,它将无法返回到原始任务,直到第二个任务完成。因此,其他阻止方法不会偷盗工作实际上是一件好事。对于ForkJoinTask,这个问题并不存在,因为重要的是尽快完成主要任务,所有任务一起尽可能有效地处理是非常重要的。

ForkJoinPool内偷工作也不可能实现你自己的方法,因为所有相关部分都不公开。

但是,实际上有第二种方法可以防止饥饿死锁。这被称为管理阻止。它不使用工作窃取(以避免上述问题),而且还需要将要阻塞的线程积极配合线程池。通过管理阻塞,线程通知线程池它可能被阻塞之前它调用潜在的阻塞方法,并且在阻塞方法完成时通知池。线程池然后知道有一个饥饿死锁的风险,并且如果其所有线程当前处于某个阻塞操作并且还有其他任务要执行,则可能会产生其他线程。请注意,由于额外线程的开销,这比盗取工作效率低。如果你实现了一个具有普通期货和管理式阻塞的递归并行算法,而不是ForkJoinTask和偷工作,附加线程的数量可能会变得非常大(因为在算法的“分裂”阶段,将创建大量任务并且给予立即阻塞并等待子任务结果的线程)。然而,饥饿死锁仍然被阻止,并且它避免了任务由于其线程同时开始另一个任务而必须等待很长时间的问题。

Java的ForkJoinPool也支持管理阻塞。为了使用它,需要实现接口ForkJoinPool.ManagedBlocker,以便任务想要执行的潜在阻塞方法从此接口的block方法中调用。然后该任务可能不会直接调用阻塞方法,而是需要调用静态方法ForkJoinPool.managedBlock(ManagedBlocker)。该方法在阻塞之前和之后处理与线程池的通信。如果当前任务没有在ForkJoinPool内执行,它也可以工作,然后它只是调用阻塞方法。

我在Java API(用于Java 7)中发现的实际使用托管阻塞的唯一地方是类Phaser。 (这个类是像互斥锁和锁存器这样的同步障碍,但更灵活和更强大。)因此,与ForkJoinPool任务中的Phaser同步应该使用受管阻塞,并且可以避免挨饿死锁(但ForkJoinTask.join()仍然是可取的,因为它使用工作窃取而不是管理阻止)。无论您是直接使用ForkJoinPool还是通过ExecutorService接口,这都可以工作。但是,如果您使用类别Executors创建的其他ExecutorService,则它不起作用,因为它们不支持受管理阻止。

在斯卡拉,受管阻塞的使用更广泛(description,API)。

+2

感谢您的回答,非常全面。尽管一个挑剔,'ForkJoinTask'实现在'get()'中执行与在join()中执行相同的操作。我的问题中的死锁主要来自尝试同步没有'ForkJoinPool.managedBlock()'(实际上,这两个例子在Java 7上的死锁)。改用'Phaser's,两者都可以工作。 – 2014-10-28 21:29:13

+0

也许你可以澄清[Java的线程的阻塞状态](http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.State.html)如何对应于现代中的阻塞线程斯卡拉感。每个阻塞操作是否按照该链接上描述的方式等待Java监视器锁定?或者线程池是否以其他方式跟踪阻塞状态? – matanster 2015-03-16 10:11:18

+0

@matt我不确定你的意思是“阻塞现代斯卡拉意义上的线程”。 State.BLOCKED'和管理阻塞之间'的连接仅仅是一个知道很快,这可能是在BLOCKED状态的线程应该调用'managedBlock'告诉ForkJoinPool这个前期。 ForkJoinPool不会检测线程当前是否被阻塞。如果一个线程使用'managedBlock'但实际上并未被阻塞,那么ForkJoinPool仍然会增加线程的数量。 – 2015-03-16 16:04:06

0

我明白你在做什么,但我不知道为什么。屏障的思想是独立的线程可以等待对方达到共同点。你没有独立的线程。线程池,F/J,是Data Parallelism

你正在做的事情更加切合Task Parallelism

原因F /Ĵ继续是框架创建“延续线程”继续从双端获取工作的时候所有工作线程正在等待。

+0

障碍只是为了确保每个任务都安排在单独的线程上。我不认为“延续线程”是答案,如果你打印出'Thread.currentThread()。getId()',你会看到其中一个'ForkableTasks'运行在与之相同的线程中调用剩下的部分,并且总共只使用4个线程。 – 2014-10-26 22:35:16

+0

您不能保证哪个线程处理盗用工作的任务。所有任务转储到同一个提交队列中。取决于您使用哪个版本(Java7/8),将该块的工作线程替换为“继续”或“补偿”线程。你所做的并不是F/J(数据并行性)的优点。 – edharned 2014-10-27 14:26:15

0

当使用线程池的线程和任务数量有限(例如通过future.get())时,总会有线程匮乏的可能性。可以使用无限制的线程池(并准备好OutOfMemoryError),或者通过将阻塞任务分解为解锁部件,并在满足所需条件时激活,使用非阻塞任务。 Future类不能执行此操作(激活任务),但Java8中的CompletableFuture可以。还可以看到许多演员和数据流库,例如我df4j2

+0

传统的线程池确实如此。但'ForkJoinPool'实现'ForkJoinTask'上的'ExecutorService' API,所以我期望它的行为匹配。 – 2014-10-27 13:47:29