在multithreading情况下使用限制流的最佳方式性能

我观看了JoséPaumard在InfoQ上的演讲: http : //www.infoq.com/fr/presentations/jdk8-lambdas-streams-collectors (法语)

问题是我被困在这一点上。 要使用流multithreading收集1M Long 我们可以这样做:

Stream stream = Stream.generate(() -> ThreadLocalRandom.current().nextLong()) ; List list1 = stream.parallel().limit(10_000_000).collect(Collectors.toList()) ; 

但考虑到线程始终检查所述限制以阻碍性能。

在那次演讲中我们也看到了第二个解决方案:

 Stream stream = ThreadLocalRandom.current().longs(10_000_000).mapToObj(Long::new) ; List list = stream.parallel().collect(Collectors.toList()) ; 

它似乎是更好的表现。

所以这是我的问题:为什么第二个代码更好,是否有更好的,或者至少成本更低的方法呢?

这是依赖实现的限制。 关注并行性能的开发人员必须理解的一件事是,可预测的流大小通常有助于并行性能,因为它们允许平衡分配工作负载。

这里的问题是,通过Stream.generate()limit()创建的无限流的组合不会产生具有可预测大小的流,尽管它看起来对我们来说是完全可预测的。

我们可以使用以下帮助器方法检查它:

 static void sizeOf(String op, IntStream stream) { final Spliterator.OfInt s = stream.spliterator(); System.out.printf("%-18s%5d, %d%n", op, s.getExactSizeIfKnown(), s.estimateSize()); } 

然后

 sizeOf("randoms with size", ThreadLocalRandom.current().ints(1000)); sizeOf("randoms with limit", ThreadLocalRandom.current().ints().limit(1000)); sizeOf("range", IntStream.range(0, 100)); sizeOf("range map", IntStream.range(0, 100).map(i->i)); sizeOf("range filter", IntStream.range(0, 100).filter(i->true)); sizeOf("range limit", IntStream.range(0, 100).limit(10)); sizeOf("generate limit", IntStream.generate(()->42).limit(10)); 

将打印

 randoms with size 1000, 1000 randoms with limit -1, 9223372036854775807 range 100, 100 range map 100, 100 range filter -1, 100 range limit -1, 100 generate limit -1, 9223372036854775807 

所以我们看到,像Random.ints(size)IntStream.range(…)这样的某些源产生具有可预测大小的流,并且某些中间操作(如map能够携带信息,因为他们知道大小不受影响。 其他像filterlimit不会传播大小(作为已知的确切大小)。

很明显, filter不能预测元素的实际数量,但是它提供了源大小作为估计,这是合理的,因为这是可以通过filter的元素的最大数量。

相反,当前limit实现不提供大小,即使源具有确切的大小,并且我们知道可预测的大小与min(source size, limit)一样简单。 相反,它甚至报告了一个荒谬的估计大小(来源的大小),尽管事实上已知结果大小永远不会高于限制。 在无限流的情况下,我们还有一个额外的障碍,即流所基于的Spliterator接口没有办法报告它是无限的。 在这些情况下,无限流+限制返回Long.MAX_VALUE作为估计,这意味着“我甚至无法猜测”。

因此,根据经验,当前实现时,程序员应该避免在有预先在流的源处指定所需大小的方法时使用limit 。 但是,由于limit有序并行流(不适用于randoms或generate )的情况下也存在显着(记录)的缺点,因此大多数开发人员无论如何都要避免limit

为什么第二个代码更好?

在第一种情况下,您创建无限源,将其拆分为并行执行到一堆任务,每个任务提供无限数量的元素,然后限制结果的整体大小 。 即使源是无序的,这也意味着一些开销。 在这种情况下,各个任务应相互通信以检查何时达到整体大小。 如果他们经常谈话,这会增加争用。 如果他们少说话,他们实际上产生的数量超过了必要的数量,然后放弃了一些。 我相信,实际的流API实现是在任务之间减少对话,但这实际上导致产生的数量超过了必要的数量。 这也会增加内存消耗并激活垃圾收集器。

相比之下,在第二种情况下,您可以创建已知大小的有限源。 当任务被分成子任务时,它们的大小也被很好地定义,并且它们总共产生所请求的随机数,而根本不需要彼此交谈。 这就是为什么它更快。

有没有更好的,或者至少成本更低的方式呢?

你的代码示例中最大的问题是装箱。 如果您需要10_000_000个随机数,那么将它们分别存储并存储在List是非常糟糕的:您创建了大量不必要的对象,执行了许多堆分配等等。 用原始流替换它:

 long[] randomNumbers = ThreadLocalRandom.current().longs(10_000_000).parallel().toArray(); 

这会快得多(可能是一个数量级)。

您也可以考虑使用新的Java-8 SplittableRandom类。 它提供了大致相同的性能,但生成的随机数具有更高的质量(包括通过DieHarder 3.31.1 ):

 long[] randomNumbers = new SplittableRandom().longs(10_000_000).parallel().toArray(); 

JDK docs对这种行为有很好的解释,它是为了并行处理而牺牲性能的排序约束

来自doc的限制function文本 – https://docs.oracle.com/javase/8/docs/api/java/util/stream/LongStream.html

虽然limit()通常是顺序流管道上的廉价操作,但在有序并行管道上可能非常昂贵,特别是对于maxSize的大值,因为limit(n)被约束为不仅返回任何n个元素,而是第一个n遇到订单中的元素。 如果您的情境的语义允许,使用无序流源(例如generate(LongSupplier))或使用BaseStream.unordered()删除排序约束可能会导致并行管道中limit()的显着加速。 如果需要与遇到顺序的一致性,并且您在并行管道中使用limit()遇到性能不佳或内存利用率,则使用sequential()切换到顺序执行可能会提高性能。 大段引用