嵌套的Java 8并行forEach循环表现不佳。 这种行为有望吗?

注意:我已经在另一个SOpost中解决了这个问题 – 在嵌套的Java 8并行流动作中使用信号量可能是DEADLOCK。 这是一个错误吗? – 但是这篇文章的标题表明问题与使用信号量有关 – 这有点分散了讨论的注意力。 我正在创建这个,以强调嵌套循环可能有性能问题 – 虽然这两个问题可能是一个共同的原因(也许是因为我花了很多时间来弄清楚这个问题)。 (我不认为它是重复的,因为它强调另一种症状 – 但如果你只是删除它)。

问题:如果你嵌套两个Java 8 stream.parallel()。forEach循环并且所有任务都是独立的,无状态的等等 – 除了被提交到公共FJ池 – 然后在并行循环内嵌套并行循环执行得更差而不是在并行循环内嵌套顺序循环。 更糟糕的是:如果同步包含内循环的操作,您将获得DEADLOCK。

演示性能问题

如果没有“同步”,您仍然可以观察到性能问题。 您可以在以下url找到演示代码: http : //svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (有关更详细的说明,请参阅JavaDoc)。

我们的设置如下:我们有一个嵌套的stream.parallel()。forEach()。

  • 内环是独立的(无状态,无干扰等 – 除了使用公共池之外)并且在最坏的情况下总共消耗1秒,即如果是顺序处理的话。
  • 外循环的一半任务在该循环之前消耗10秒。
  • 在该循环之后,一半消耗10秒。
  • 因此,每个线程总共消耗11秒(最坏情况)。 *我们有一个布尔值,允许将内部循环从parallel()切换到sequential()。

现在:将24个外循环任务提交到具有并行性的池8我们预计24/8 * 11 =最多33秒(在8核或更好的机器上)。

结果是:

  • 内部顺序循环:33秒。
  • 内部并行循环:> 80秒(我有92秒)。

问题:你能证实这种行为吗? 这是人们对框架的期望吗? (我现在更加小心,声称这是一个错误,但我个人认为这是由于ForkJoinTask的实现中的一个错误。备注:我已将此发布到并发兴趣(请参阅http:// cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ),但到目前为止我没有得到确认)。

certificate了僵局

以下代码将为DEADLOCK

// Outer loop IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> { doWork(); synchronized(this) { // Inner loop IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> { doWork(); }); } }); 

其中numberOfTasksInOuterLoop = 24numberOfTasksInInnerLoop = 240outerLoopOverheadFactor = 10000doWork是一些无状态CPU刻录机。

您可以在http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java中找到完整的演示代码(有关更详细的说明,请参阅JavaDoc)。

这种行为有望吗? 请注意,有关Java并行流的文档未提及嵌套或同步的任何问题。 此外,没有提到使用公共fork-join-pool的事实。

更新

关于性能问题的另一个测试可以在http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java找到 – 这个测试没有任何阻塞操作(没有线程) .sleep和not synchronized)。 我在这里写了一些更多的评论: http : //christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新2

似乎这个问题和更严重的带有信号量的DEADLOCK已在Java8 u40中得到修复。

问题是你配置的相当有限的并行性被外部流处理吃掉:如果你说你需要八个线程并使用parallel()处理一个超过八个项的流,它将创建八个工作线程并让它们处理项目。

然后在您的消费者中使用parallel()处理另一个流,但是没有剩余的工作线程。 由于工作线程被阻塞等待内部流处理的结束,因此ForkJoinPool必须创建违反配置的并行性的新工作线程。 在我看来,它不会回收这些延伸线程,但让它们在处理后立即死亡。 因此,在内部处理过程中,会创建和处理新线程,这是一项昂贵的操作。

您可能会认为启动线程无法为并行流处理的计算做出贡献,但只是等待结果,但即使修复了这个问题,您仍然会遇到一个很难(如果有的话)修复的一般问题:

每当工作线程数与外部流项目之间的比率较低时,实现将全部用于外部流,因为它不知道该流是外部流。 因此,并行执行内部流请求比可用更多的工作线程。 使用调用程序线程为计算做出贡献可以以性能等于串行计算的方式修复它,但是在这里获得并行执行的优势并不能与固定数量的工作线程的概念一起使用。

请注意,此处您正在研究此问题的表面,因为您有相当平衡的项目处理时间。 如果内部项目和外部项目的处理分歧(与同一级别的项目相比),问题将更加严重。


更新:通过分析和查看代码,似乎ForkJoinPool尝试使用等待线程进行“工作窃取”,但使用不同的代码取决于Thread是工作线程还是其他线程的事实。 结果,一个工作线程实际上正在等待大约80%的时间并且做很少甚至没有工作,而其他线程确实有助于计算……


更新2:为了完整性,这里是注释中描述的简单并行执行方法。 由于它将每个项目排队,因此当单个项目的执行时间相当小时,它会产生很大的开销。 所以它不是一个复杂的解决方案,而是一个可以处理长时间运行的任务而不需要太多魔法的演示……

 import java.lang.reflect.UndeclaredThrowableException; import java.util.concurrent.*; import java.util.function.IntConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; public class NestedParallelForEachTest1 { static final boolean isInnerStreamParallel = true; // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec. static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (eg > 1000). static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (eg > 1000). static final int concurrentExecutionsLimitForStreams = 8; public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home")); new NestedParallelForEachTest1().testNestedLoops(); E.shutdown(); } final static ThreadPoolExecutor E = new ThreadPoolExecutor( concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams, 2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() ); public static void parallelForEach(IntStream s, IntConsumer c) { s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList()) .forEach(NestedParallelForEachTest1::waitOrHelp); } static void waitOrHelp(Future f) { while(!f.isDone()) { Runnable r=E.getQueue().poll(); if(r!=null) r.run(); } try { f.get(); } catch(InterruptedException ex) { throw new RuntimeException(ex); } catch(ExecutionException eex) { Throwable t=eex.getCause(); if(t instanceof RuntimeException) throw (RuntimeException)t; if(t instanceof Error) throw (Error)t; throw new UndeclaredThrowableException(t); } } public void testNestedLoops(NestedParallelForEachTest1 this) { long start = System.nanoTime(); // Outer loop parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> { if(i < 10) sleep(10 * 1000); if(isInnerStreamParallel) { // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10)); } else { // Inner loop as sequential IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10)); } if(i >= 10) sleep(10 * 1000); }); long end = System.nanoTime(); System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec."); } static void sleep(int milli) { try { Thread.sleep(milli); } catch (InterruptedException ex) { throw new AssertionError(ex); } } } 

稍微整理一下代码之后。 我没有看到与Java 8更新45相同的结果。毫无疑问是一个开销,但与你谈论的时间跨度相比,它非常小。

当您使用外部循环消耗池中的所有可用线程时,预计会出现死锁的可能性,从而不会留下执行内部循环的线程。

打印以下程序

 isInnerStreamParallel: false, isCPUTimeBurned: false java.util.concurrent.ForkJoinPool.common.parallelism = 8 Done in 33.1 seconds. isInnerStreamParallel: false, isCPUTimeBurned: true java.util.concurrent.ForkJoinPool.common.parallelism = 8 Done in 33.0 seconds. isInnerStreamParallel: true, isCPUTimeBurned: false java.util.concurrent.ForkJoinPool.common.parallelism = 8 Done in 32.5 seconds. isInnerStreamParallel: true, isCPUTimeBurned: true java.util.concurrent.ForkJoinPool.common.parallelism = 8 Done in 32.6 seconds. 

代码

 import java.util.stream.IntStream; public class NestedParallelForEachTest { // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec. static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (eg > 1000). static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (eg > 1000). static final int concurrentExecutionsLimitForStreams = 8; // java.util.concurrent.ForkJoinPool.common.parallelism public static void main(String[] args) { testNestedLoops(false, false); testNestedLoops(false, true); testNestedLoops(true, false); testNestedLoops(true, true); } public static void testNestedLoops(boolean isInnerStreamParallel, boolean isCPUTimeBurned) { System.out.println("isInnerStreamParallel: " + isInnerStreamParallel + ", isCPUTimeBurned: " + isCPUTimeBurned); long start = System.nanoTime(); System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams)); System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism")); // Outer loop IntStream.range(0, numberOfTasksInOuterLoop).parallel().forEach(i -> { // System.out.println(i + "\t" + Thread.currentThread()); if(i < 10) burnTime(10 * 1000, isCPUTimeBurned); IntStream range = IntStream.range(0, numberOfTasksInInnerLoop); if (isInnerStreamParallel) { // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis range = range.parallel(); } else { // Inner loop as sequential } range.forEach(j -> burnTime(10, isCPUTimeBurned)); if(i >= 10) burnTime(10 * 1000, isCPUTimeBurned); }); long end = System.nanoTime(); System.out.printf("Done in %.1f seconds.%n", (end - start) / 1e9); } static void burnTime(long millis, boolean isCPUTimeBurned) { if (isCPUTimeBurned) { long end = System.nanoTime() + millis * 1000000; while (System.nanoTime() < end) ; } else { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new AssertionError(e); } } } } 

我可以确认这仍然是8u72中的性能问题,尽管它不会再陷入僵局。 并行终端操作仍然使用ForkJoinPool上下文之外的ForkJoinTask实例完成,这意味着每个并行流仍然共享公共池 。

为了certificate一个简单的病态案例:

 import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; public class ParallelPerf { private static final Object LOCK = new Object(); private static void runInNewPool(Runnable task) { ForkJoinPool pool = new ForkJoinPool(); try { pool.submit(task).join(); } finally { pool.shutdown(); } } private static  T runInNewPool(Callable task) { ForkJoinPool pool = new ForkJoinPool(); try { return pool.submit(task).join(); } finally { pool.shutdown(); } } private static void innerLoop() { IntStream.range(0, 32).parallel().forEach(i -> { // System.out.println(Thread.currentThread().getName()); try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } public static void main(String[] args) { System.out.println("==DEFAULT=="); long startTime = System.nanoTime(); IntStream.range(0, 32).parallel().forEach(i -> { synchronized (LOCK) { innerLoop(); } // System.out.println(" outer: " + Thread.currentThread().getName()); }); System.out.println(System.nanoTime() - startTime); System.out.println("==NEW POOLS=="); startTime = System.nanoTime(); IntStream.range(0, 32).parallel().forEach(i -> { synchronized (LOCK) { runInNewPool(() -> innerLoop()); } // System.out.println(" outer: " + Thread.currentThread().getName()); }); System.out.println(System.nanoTime() - startTime); } } 

第二次运行将innerLoop传递给runInNewPool而不是直接调用它。 在我的机器上(i7-4790,8个CPU线程),我获得了大约4倍的加速:

 ==DEFAULT== 4321223964 ==NEW POOLS== 1015314802 

取消注释其他打印语句会使问题变得明显:

 [...] ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-6 outer: ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-3 [...] ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-3 outer: ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-4 ForkJoinPool.commonPool-worker-4 [...] 

公共池工作线程在同步块中堆积,一次只能有一个线程进入。 由于内部并行操作使用相同的池,并且池中的所有其他线程都在等待锁定,因此我们获得单线程执行。

以及使用单独的ForkJoinPool实例的结果:

 [...] ForkJoinPool-1-worker-0 ForkJoinPool-1-worker-6 ForkJoinPool-1-worker-5 outer: ForkJoinPool.commonPool-worker-4 ForkJoinPool-2-worker-1 ForkJoinPool-2-worker-5 [...] ForkJoinPool-2-worker-7 ForkJoinPool-2-worker-3 outer: ForkJoinPool.commonPool-worker-1 ForkJoinPool-3-worker-2 ForkJoinPool-3-worker-5 [...] 

我们仍然一次在一个工作线程上运行内部循环,但内部并行操作每次都获得一个新的池,并且可以利用其所有工作线程。

这是一个人为的例子,但删除同步块仍然显示出类似的速度差异,因为内部和外部循环仍然在相同的工作线程上竞争。 在multithreading中使用并行流时,multithreading应用程序需要小心,因为这可能导致重叠时随机减速。

这是所有终端操作的问题,而不仅仅是forEach ,因为它们都在公共池中运行任务。 我正在使用上面的runInNewPool方法作为解决方法,但希望这将在某些时候内置到标准库中。