我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗?

如果池中的所有线程都在等待同一池中的排队任务完成,则在正常线程池中发生线程饥饿死锁ForkJoinPool通过从join()调用中窃取其他线程的工作来避免这个问题,而不是简单地等待。 例如:

 private static class ForkableTask extends RecursiveTask { private final CyclicBarrier barrier; ForkableTask(CyclicBarrier barrier) { this.barrier = barrier; } @Override protected Integer compute() { try { barrier.await(); return 1; } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } } } @Test public void testForkJoinPool() throws Exception { final int parallelism = 4; final ForkJoinPool pool = new ForkJoinPool(parallelism); final CyclicBarrier barrier = new CyclicBarrier(parallelism); final List forkableTasks = new ArrayList(parallelism); for (int i = 0; i < parallelism; ++i) { forkableTasks.add(new ForkableTask(barrier)); } int result = pool.invoke(new RecursiveTask() { @Override protected Integer compute() { for (ForkableTask task : forkableTasks) { task.fork(); } int result = 0; for (ForkableTask task : forkableTasks) { result += task.join(); } return result; } }); assertThat(result, equalTo(parallelism)); } 

但是当使用ExecutorService接口到ForkJoinPool ,似乎不会发生工作窃取。 例如:

 private static class CallableTask implements Callable { private final CyclicBarrier barrier; CallableTask(CyclicBarrier barrier) { this.barrier = barrier; } @Override public Integer call() throws Exception { barrier.await(); return 1; } } @Test public void testWorkStealing() throws Exception { final int parallelism = 4; final ExecutorService pool = new ForkJoinPool(parallelism); final CyclicBarrier barrier = new CyclicBarrier(parallelism); final List callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier)); int result = pool.submit(new Callable() { @Override public Integer call() throws Exception { int result = 0; // Deadlock in invokeAll(), rather than stealing work for (Future future : pool.invokeAll(callableTasks)) { result += future.get(); } return result; } }).get(); assertThat(result, equalTo(parallelism)); } 

从粗略看看ForkJoinPool的实现,所有常规的ExecutorService API都是使用ForkJoinTask的,所以我不确定为什么会发生死锁。

你几乎回答了自己的问题。 解决方案是“ ForkJoinPool通过从join()调用内部的其他线程窃取工作来避免此问题的声明”。 每当除了ForkJoinPool.join()之外的某些其他原因阻塞线程时,这种工作窃取不会发生,并且线程只是等待并且什么都不做。

这样做的原因是,在Java中, ForkJoinPool不可能阻止其线程阻塞,而是为其提供其他工作。 线程本身需要避免阻塞,而是要求池应该做的工作。 这只在ForkJoinTask.join()方法中ForkJoinTask.join() ,而不是在任何其他阻塞方法中实现。 如果你在ForkJoinPool使用Future ,你也会看到饥饿僵局。

为什么工作窃取只在ForkJoinTask.join()ForkJoinTask.join()而不是在Java API中的任何其他阻塞方法中实现? 好吧,有很多这样的阻塞方法( Object.wait()Future.get()java.util.concurrent任何并发原语,I / O方法等),它们与ForkJoinPool无关,这只是API中的一个任意类,因此向所有这些方法添加特殊情况将是糟糕的设计。 它还可能导致非常令人惊讶和不希望的影响。 想象一下,例如,用户将任务传递给等待FutureExecutorService ,然后发现任务在Future.get()挂起很长时间只是因为正在运行的线程偷了一些其他(长时间运行的)工作项而不是等待Future并在结果可用后立即继续。 一旦线程开始处理另一个任务,它就无法返回到原始任务,直到第二个任务完成。 因此,其他阻止方法不会进行工作窃取实际上是一件好事。 对于ForkJoinTask ,此问题不存在,因为主要任务尽快继续并不重要,所有任务一起尽可能高效地处理是非常重要的。

ForkJoinPool执行工作窃取也不可能实现自己的方法,因为所有相关部分都不公开。

但是,实际上还有第二种方法可以防止饥饿死锁。 这称为托管阻止 。 它不使用工作窃取(以避免上面提到的问题),但也需要阻塞的线程积极配合线程池。 使用托管阻塞,线程告诉线程池它可能调用潜在阻塞方法之前被阻塞,并在阻塞方法完成时通知池。 然后线程池知道存在饥饿死锁的风险,并且如果其所有线程当前处于某些阻塞操作中并且还有其他任务要执行,则可能产生其他线程。 请注意,由于额外线程的开销,这比工作窃取效率低。 如果使用普通期货和托管阻塞实现递归并行算法而不是使用ForkJoinTask和工作窃取,则额外线程的数量会变得非常大(因为在算法的“除法”阶段,将创建许多任务并且给予立即阻止并等待子任务结果的线程)。 但是,仍然会阻止饥饿死锁,并且它避免了任务必须等待很长时间的问题,因为它的线程同时开始处理另一个任务。

ForkJoinPool of Java还支持托管阻止。 要使用它,需要实现ForkJoinPool.ManagedBlocker接口,以便从该接口的block方法中调用任务要执行的潜在阻塞方法。 然后任务可能不会直接调用阻塞方法,而是需要调用静态方法ForkJoinPool.managedBlock(ManagedBlocker) 。 此方法在阻塞之前和之后处理与线程池的通信。 如果当前任务未在ForkJoinPool执行,它也可以工作,然后它只调用阻塞方法。

我在Java API(Java 7)中找到的唯一实际使用托管阻塞的地方是Phaser类。 (这个类是同步屏障,如互斥锁和锁存器,但更灵活,更强大。)因此,与ForkJoinPool任务内的Phaser同步应该使用托管阻塞,并且可以避免饥饿死锁(但是ForkJoinTask.join()仍然是可取的,因为它使用工作窃取而不是托管阻止)。 无论您是直接使用ForkJoinPool还是通过其ExecutorService接口,这都有效。 但是,如果您使用类Executors创建的任何其他ExecutorService ,它将不起作用,因为它们不支持托管阻止。

在Scala中,托管阻塞的使用更为普遍( 描述 , API )。

我明白你在做什么,但我不知道为什么。 屏障的想法是如此独立的线程可以等待彼此达到共同点。 你没有独立的线程。 线程池,F / J,用于数据并行

你正在做一些更适合任务并行的事情

F / J继续的原因是框架创建“延续线程”以在所有工作线程等待时继续从deques中获取工作。