并行转换流时如何使用收集器

我实际上试图回答这个问题如何跳过从Files.lines获得的Stream 的行 。 所以我虽然这个收集器并不能很好地并行工作:

private static Collector<String, ?, List> oddLines() { int[] counter = {1}; return Collector.of(ArrayList::new, (l, line) -> { if (counter[0] % 2 == 1) l.add(line); counter[0]++; }, (l1, l2) -> { l1.addAll(l2); return l1; }); } 

但它的确有效。

编辑:它实际上没有工作; 我被我的输入集太小而无法触发任何并行性这一事实所迷惑; 见评论中的讨论

我认为它不会起作用,因为我想到了以下两个执行计划。


1. counter数组在所有线程之间共享。

线程t1读取Stream的第一个元素,因此满足if条件。 它将第一个元素添加到其列表中。 然后在他有时间更新数组值之前停止执行。

线程t2,从流的第4个元素开始,将其添加到其列表中。 所以我们最终得到了一个非想要的元素。

当然,既然这个collections家似乎有效,我猜它不会那样。 而且无论如何更新都不是primefaces的。


2.每个线程都有自己的数组副本

在这种情况下,更新没有更多的问题,但没有什么能阻止线程t2不会从流的第4个元素开始。 所以他也不像那样工作。


所以它似乎根本不起作用,这让我想到了……收集器是如何并行使用的?

有人能解释我基本上它是如何工作的以及为什么我的collections家在并行运行时工作?

非常感谢你!

parallel()源流传递到收集器就足以打破逻辑,因为共享状态( counter可能会从不同的任务中增加。 您可以validation,因为它永远不会为任何有限的流输入返回正确的结果:

  Stream lines = IntStream.range(1, 20000).mapToObj(i -> i + ""); System.out.println(lines.isParallel()); lines = lines.parallel(); System.out.println(lines.isParallel()); List collected = lines.collect(oddLines()); System.out.println(collected.size()); 

请注意,对于无限流(例如,从Files.lines()读取时),您需要在流中生成大量数据 ,因此它实际上要求任务同时运行一些块。

我的输出是:

 false true 12386 

这显然是错的。


正如@Holger在评论中正确指出的那样,当收集器指定CONCURRENTUNORDERED时会发生不同的竞争,在这种情况下,它们跨任务操作单个共享集合( ArrayList::new每个流调用一次), where – 与parallel() ,它将在每个任务的集合上运行累加器,然后使用您定义的组合器组合结果。

如果要将特征添加到收集器,则由于单个集合中的共享状态,您可能会遇到以下结果:

 false true Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73 at java.util.ArrayList.add(ArrayList.java:459) at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18) at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source) at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496) at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496) at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386 

实际上这个collections家工作只是巧合。 它不适用于自定义数据源。 考虑这个例子:

 List list = IntStream.range(0, 10).parallel().mapToObj(String::valueOf) .collect(oddLines()); System.out.println(list); 

这会产生不同的结果。 真正的原因只是因为BufferedReader.lines()流至少被java.util.Spliterators.IteratorSpliterator.BATCH_UNIT分割为1024行。如果你的行数大得多,即使使用BufferedReader也可能会失败:

 String data = IntStream.range(0, 10000).mapToObj(String::valueOf) .collect(Collectors.joining("\n")); List list = new BufferedReader(new StringReader(data)).lines().parallel() .collect(oddLines()); list.stream().mapToInt(Integer::parseInt).filter(x -> x%2 != 0) .forEach(System.out::println); 

如果收集器工作正常,这不应该打印任何东西。 但有时它打印。