Java 8流串行和并行性能

在我的机器上,下面的程序打印:

OptionalLong[134043] PARALLEL took 127869 ms OptionalLong[134043] SERIAL took 60594 ms 

我不清楚为什么串行执行程序比并行执行程序要快。 我已经在一个相对安静的8gb盒子上给了两个程序-Xms2g -Xmx2g 。 有人可以澄清最新情况吗?

 import java.util.stream.LongStream; import java.util.stream.LongStream.Builder; public class Problem47 { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst()); final long endTime = System.currentTimeMillis(); System.out.println(" PARALLEL took " +(endTime - startTime) + " ms"); final long startTime2 = System.currentTimeMillis(); System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst()); final long endTime2 = System.currentTimeMillis(); System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms"); } static boolean fourConsecutives(final long n) { return distinctPrimeFactors(n).count() == 4 && distinctPrimeFactors(n + 1).count() == 4 && distinctPrimeFactors(n + 2).count() == 4 && distinctPrimeFactors(n + 3).count() == 4; } static LongStream distinctPrimeFactors(long number) { final Builder builder = LongStream.builder(); final long limit = number / 2; long n = number; for (long i = 2; i <= limit; i++) { while (n % i == 0) { builder.accept(i); n /= i; } } return builder.build().distinct(); } } 

虽然Brian Goetz对你的设置是正确的,例如你应该使用.range(1, 1000000)而不是.iterate(1, n -> n + 1).limit(1000000)并且你的基准方法非常简单,我想要强调重点:

即使解决了这些问题,即使使用挂钟和TaskManager,您也可以看到出现了问题。 在我的机器上,操作大约需要半分钟,您可以看到并行性在大约两秒后降至单核。 即使一个专门的基准测试工具可以产生不同的结果,除非你想一直在基准工具中运行你的最终应用程序,否则无关紧要……

现在我们可以尝试模拟有关您的设置的更多信息,或者告诉您应该学习有关Fork / Join框架的特殊内容,就像实现者在讨论列表中所做的那样 。

或者我们尝试替代实施:

 ExecutorService es=Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors()); AtomicLong found=new AtomicLong(Long.MAX_VALUE); LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE) .forEach(n -> es.submit(()->{ if(found.get()>n && fourConsecutives(n)) for(;;) { long x=found.get(); if(x 

在我的机器上,它可以实现我所期望的并行执行只需要稍微多一点的⟨sequential time⟩/⟨number of cpu cores⟩ 。 无需更改fourConsecutives实现中的任何内容。

最重要的是,至少在处理单个项目需要大量时间时,当前的Stream实现(或底层的Fork / Join框架)存在问题,如本相关问题中已经讨论的那样 。 如果你想要可靠的并行性,我建议使用经过validation和测试过的ExecutorService 。 正如您在我的示例中所看到的,它并不意味着删除Java 8function,它们很好地结合在一起。 只应谨慎使用Stream.parallel引入的自动并行(给定当前实现)。

我们可以更容易并行执行,但我们不一定能使并行化变得容易。

代码中的罪魁祸首是limit + parallel的组合。 实现limit()对于顺序流来说是微不足道的,但对并行流来说相当昂贵。 这是因为限制操作的定义与流的遭遇顺序相关联。 具有limit()的流通常比顺序流更慢,除非每个元素完成的计算非常高。

您选择的流源也限制了并行性。 使用iterate(0, n->n+1)给出正整数,但iterate基本上是顺序的; 在计算第(n-1)个元素之前,不能计算第n个元素。 因此,当我们尝试拆分此流时,我们最终会分裂(首先,rest)。 尝试使用range(0,k)代替; 这更好地分裂,通过随机访问的一半整齐地分裂。