在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK。 这是一个错误吗?

考虑以下情况:我们使用Java 8并行流来执行并行forEach循环,例如,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */}) 

并行线程的数量由系统属性“java.util.concurrent.ForkJoinPool.common.parallelism”控制,通常等于处理器的数量。

现在假设我们想限制特定工作的并行执行次数 – 例如,因为该部分是内存密集型的,而内存约束意味着并行执行的限制。

限制并行执行的一种明显而优雅的方法是使用信号量( 此处建议),例如,以下代码片段将并行执行的数量限制为5:

  final Semaphore concurrentExecutions = new Semaphore(5); IntStream.range(0,20).parallel().forEach(i -> { concurrentExecutions.acquireUninterruptibly(); try { /* WORK DONE HERE */ } finally { concurrentExecutions.release(); } }); 

这很好用!

但是:在worker中使用任何其他并行流(在/* WORK DONE HERE */ )可能会导致死锁

对我来说,这是一个意外的行为。

说明:由于Java流使用ForkJoin池,因此内部forEach正在分叉,并且连接似乎正在等待。 但是,这种行为仍然出乎意料。 请注意,如果将"java.util.concurrent.ForkJoinPool.common.parallelism"设置为1,并行流甚至可以工作。

另请注意,如果存在内部并行forEach,则它可能不透明。

问题: 这种行为是否符合Java 8规范(在这种情况下,它意味着禁止在并行流工作者中使用信号量)或者这是一个错误?

为方便起见:以下是一个完整的测试用例。 除了“true,true”之外,两个布尔值的任何组合都有效,这会导致死锁。

澄清:为了明确说明,让我强调一个方面:在acquire信号量时不会出现死锁。 请注意,代码包含

  1. 获得信号量
  2. 运行一些代码
  3. 释放信号量

如果那段代码使用另一个并行流,则死锁发生在2. 然后在OTHER流内发生死锁。 因此,似乎不允许一起使用嵌套并行流和阻塞操作(如信号量)!

请注意,记录并行流使用ForkJoinPool并且ForkJoinPool和Semaphore属于同一个包 – java.util.concurrent (因此可以预期它们可以很好地互操作)。

 /* * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de. * * Created on 03.05.2014 */ package net.finmath.experiments.concurrency; import java.util.concurrent.Semaphore; import java.util.stream.IntStream; /** * This is a test of Java 8 parallel streams. * * The idea behind this code is that the Semaphore concurrentExecutions * should limit the parallel executions of the outer forEach (which is an * IntStream.range(0,numberOfTasks).parallel().forEach (for example: * the parallel executions of the outer forEach should be limited due to a * memory constrain). * * Inside the execution block of the outer forEach we use another parallel stream * to create an inner forEach. The number of concurrent * executions of the inner forEach is not limited by us (it is however limited by a * system property "java.util.concurrent.ForkJoinPool.common.parallelism"). * * Problem: If the semaphore is used AND the inner forEach is active, then * the execution will be DEADLOCKED. * * Note: A practical application is the implementation of the parallel * LevenbergMarquardt optimizer in * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html} * In one application the number of tasks in the outer and inner loop is very large (>1000) * and due to memory limitation the outer loop should be limited to a small (5) number * of concurrent executions. * * @author Christian Fries */ public class ForkJoinPoolTest { public static void main(String[] args) { // Any combination of the booleans works, except (true,true) final boolean isUseSemaphore = true; final boolean isUseInnerStream = true; final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (eg > 1000). final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (eg > 1000). final int concurrentExecusionsLimitInOuterLoop = 5; final int concurrentExecutionsLimitForStreams = 10; final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop); System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams)); System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism")); IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> { if(isUseSemaphore) { concurrentExecutions.acquireUninterruptibly(); } try { System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread()); if(isUseInnerStream) { runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop); } else { try { Thread.sleep(10*numberOfTasksInInnerLoop); } catch (Exception e) { } } } finally { if(isUseSemaphore) { concurrentExecutions.release(); } } }); System.out.println("DONE"); } /** * Runs code in a parallel forEach using streams. * * @param numberOfTasksInInnerLoop Number of tasks to execute. */ private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) { IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> { try { Thread.sleep(10); } catch (Exception e) { } }); } } 

每当您将问题分解为任务时,这些任务可以在其他任务上被阻止,并尝试在有限的线程池中执行它们,您就有可能遇到池引发的死锁 。 请参见练习 8.1中的Java并发

这无疑是一个错误 – 在你的代码中。 您正在使用将阻止等待同一池中其他任务的结果的任务来填充FJ池。 有时候你会很幸运,事情就不会陷入僵局(就像并非所有的锁定错误一直导致死锁一样),但从根本上说,你在这里非常薄弱的​​滑冰。

在对ForkJoinPool和ForkJoinTask的源代码进行一些调查之后,我假设我找到了答案:

这是一个错误(在我看来),错误在ForkJoinTask doInvoke()中。 问题实际上与两个循环的嵌套有关,并且可能与使用信号量无关,但是,需要信号量(或外环中的s.th.阻塞)使问题变得明显并导致死锁(但我可以想象这个bug隐含的其他问题 – 请参阅嵌套Java 8并行forEach循环执行不佳。是否需要这种行为? )。

doInvoke()方法的实现目前如下所示:

 /** * Implementation for invoke, quietlyInvoke. * * @return status upon completion */ private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) : externalAwaitDone(); } 

(也许在doJoin看起来很相似)。 在线

  ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 

如果Thread.currentThread()是ForkJoinWorkerThread的实例,则进行测试。 此测试的原因是检查ForkJoinTask是否在池的工作线程或主线程上运行。 我相信这条线对于非嵌套并行是可行的,它允许区分当前任务是在主线程还是池工作器上运行。 但是,对于内循环的任务,这个测试是有问题的:让我们调用运行parallel()的线程 .forEeach 创建者线程 。 对于外部循环,创建者线程是主线程,它不是instanceof ForkJoinWorkerThreadinstanceof ForkJoinWorkerThread 。 但是,对于从ForkJoinWorkerThread运行的内部循环,创建者线程也是instanceof ForkJoinWorkerThreadinstanceof ForkJoinWorkerThread 。 因此,在这种情况下,测试((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)总是正确的!

因此,我们总是调用pool.awaitJoint(wt.workQueue)

现在,请注意我们在该线程的FULL awaitJoint上调用awaitJoint (我相信这是一个额外的缺陷)。 看起来好像我们不仅加入了内循环任务,还加入了外循环的任务,我们加入了所有这些任务。 不幸的是,外部任务包含信号量。

为了certificate该错误与此相关,我们可以检查一个非常简单的解决方法。 我创建了一个运行内循环的t = new Thread() ,然后执行t.start(); t.join(); t.start(); t.join(); 。 请注意,这不会引入任何额外的并行性(我立即加入)。 但是,它将更改创建者线程的instanceof ForkJoinWorkerThread测试instanceof ForkJoinWorkerThread的结果。 (请注意,任务仍将提交到公共池)。 如果创建了该包装线程,则问题不再发生 - 至少在我当前的测试情况下。

我发了一个完整的演示到http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/ForkJoinPoolTest.java

在此测试代码中的组合

 final boolean isUseSemaphore = true; final boolean isUseInnerStream = true; final boolean isWrappedInnerLoopThread = false; 

将导致死锁,而组合

 final boolean isUseSemaphore = true; final boolean isUseInnerStream = true; final boolean isWrappedInnerLoopThread = true; 

(实际上所有其他组合)都不会。

更新:由于许多人指出使用信号量是危险的,我试图在没有信号量的情况下创建问题的演示。 现在,没有更多的僵局,但在我看来 - 意外的性能问题。 我在Nested Java 8并行创建了一个新post,因为每个循环执行得很差。 这种行为有望吗? 。 演示代码在这里: http : //svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java

我在一个分析器(VisualVM)中运行你的测试,我同意:线程正在等待信号量和F / J池中的aWaitJoin()。

这个框架在join()方面存在严重问题。 我四年来一直在撰写关于这个框架的评论。 基本的连接问题从这里开始。

aWaitJoin()也有类似的问题。 您可以自己仔细阅读代码。 当框架到达工作deque的底部时,它会发出wait()。 这一切归结为这个框架无法进行上下文切换。

有一种方法可以让这个框架为停滞的线程创建补偿线程。 您需要实现ForkJoinPool.ManagedBlocker接口。 你怎么能这样做,我不知道。 您正在使用流运行基本API。 您没有实现Streams API并编写自己的代码。

我坚持上面的评论:一旦你将并行性转换为API,你就放弃了控制并行机制内部工作的能力。 API没有错误(除了使用错误的框架进行并行操作。)问题是信号量或用于控制API内并行性的任何其他方法都是危险的想法。