收集器的组合器function是否可以用于顺序流?

示例程序:

public final class CollectorTest { private CollectorTest() { } private static  BinaryOperator nope() { return (t, u) -> { throw new UnsupportedOperationException("nope"); }; } public static void main(final String... args) { final Collector<Integer, ?, List> c = Collector.of(ArrayList::new, List::add, nope()); IntStream.range(0, 10_000_000).boxed().collect(c); } } 

因此,为了简化这里的问题,没有最终的转换,因此生成的代码非常简单。

现在, IntStream.range()生成一个顺序流。 我只是将结果IntegerInteger ,然后我设计的Collector将它们收集到List 。 很简单。

无论我运行多少次这个示例程序, UnsupportedOperationException不会命中,这意味着永远不会调用我的虚拟组合器。

我有点期待这个,但后来我已经误解了流,我不得不问这个问题……

当流保证顺序时,是否可以调用Collector的组合器?

仔细阅读ReduceOps.java中的流实现代码会发现,只有在ReduceTask完成时ReduceTask调用combine函数,并且仅在并行评估管道时才使用ReduceTask实例。 因此, 在当前实现中,在评估顺序流水线时从不调用组合器。

但是,规范中没有任何内容可以保证这一点。 Collector是对其实现提出要求的接口,并且没有为顺序流授予豁免。 就个人而言,我发现很难想象为什么顺序管道评估可能需要调用组合器,但是比我更有想象力的人可能会发现它的巧妙用途,并实现它。 规范允许它,即使今天的实现没有这样做,你仍然需要考虑它。

这应该不足为奇。 流API的设计中心是通过顺序执行在平等的基础上支持并行执行。 当然,程序可以观察它是顺序执行还是并行执行。 但API的设计是支持一种允许的编程风格。

如果您正在编写一个收集器,并且您发现编写关联组合器函数是不可能的(或者不方便或困难),导致您希望将流限制为顺序执行,这可能意味着您正朝着错误的方向前进。 现在是时候退一步,考虑以不同的方式解决问题。

不需要关联组合器function的常见缩减式操作称为fold-left 。 主要特点是折叠function严格从左到右应用,一次进行一次。 我不知道并行化折叠左边的方法。

当人们试图以我们一直在谈论的方式扭曲collections家时,他们通常会寻找像左撇子这样的东西。 Streams API没有对此操作的直接API支持,但它很容易编写。 例如,假设您要使用此操作减少字符串列表:重复第一个字符串,然后追加第二个字符串。 很容易certificate此操作不是关联的:

 List list = Arrays.asList("a", "b", "c", "d", "e"); System.out.println(list.stream() .collect(StringBuilder::new, (a, b) -> a.append(a.toString()).append(b), (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE 

顺序运行,这会产生所需的输出:

 aabaabcaabaabcdaabaabcaabaabcde 

但是当并行运行时,它可能产生这样的东西:

 aabaabccdde 

由于它按顺序“工作”,我们可以通过调用sequential()并通过让组合器抛出exception来强制执行此操作。 此外,供应商必须只调用一次。 没有办法合并中间结果,所以如果供应商被召唤两次,我们就已经遇到了麻烦。 但由于我们“知道”供应商仅在顺序模式下被调用一次,因此大多数人并不担心这一点。 事实上,我已经看到人们写“供应商”,这些供应商会返回一些现有的对象,而不是创建一个新的对象,这违反了供应商合同。

在使用3-argforms的collect() ,我们有三个函数中的两个破坏了它们的契约。 这不应该告诉我们以不同的方式做事吗?

这里的主要工作是由累加器function完成的。 为了实现折叠式缩减,我们可以使用forEachOrdered()以严格的从左到右的顺序应用此函数。 我们必须在之前和之后做一些设置和完成代码,但这没问题:

 StringBuilder a = new StringBuilder(); list.parallelStream() .forEachOrdered(b -> a.append(a.toString()).append(b)); System.out.println(a.toString()); 

当然,这可以并行工作,但并行运行的性能优势可能会因forEachOrdered()的排序要求而有所抵消。

总之,如果您发现自己想要进行可变缩减但是缺少关联组合器function,导致您将流限制为顺序执行,则将问题重新设置为向左折叠操作并在累加器上使用forEachRemaining()function。

正如之前@MarkoTopolnik和@Duncan的评论中所观察到的,无法保证在顺序模式下调用Collector.combiner()来产生减少的结果。 实际上,Java doc在这一点上有点主观,这可能导致不恰当的解释。

(…)并行实现将对输入进行分区,为每个分区创建结果容器,将每个分区的内容累积到该分区的子结果中, 然后使用组合器函数将子结果合并为组合结果

根据NoBlogDefFound, combinator仅用于并行模式。 请参阅以下部分报价:

combiner()用于将两个累加器连接成一个。 当收集器并行执行时,使用它,首先独立地拆分输入流和收集部件。

为了更清楚地说明这个问题,我重写了第一个代码,并提出了两种方法(串行和并行)。


 public final class CollectorTest { private CollectorTest() { } private static  BinaryOperator nope() { return (t, u) -> { throw new UnsupportedOperationException("nope"); }; } public static void main(final String... args) { final Collector> c = Collector .of(ArrayList::new, List::add, nope()); // approach sequential Stream sequential = IntStream .range(0, 10_000_000) .boxed(); System.out.println("isParallel:" + sequential.isParallel()); sequential .collect(c); // approach parallel Stream parallel = IntStream .range(0, 10_000_000) .parallel() .boxed(); System.out.println("isParallel:" + parallel.isParallel()); parallel .collect(c); } } 

运行此代码后,我们可以获得输出:

 isParallel:false isParallel:true Exception in thread "main" java.lang.UnsupportedOperationException: nope at com.stackoverflow.lambda.CollectorTest.lambda$nope$0(CollectorTest.java:18) at com.stackoverflow.lambda.CollectorTest$$Lambda$3/2001049719.apply(Unknown Source) at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:174) at java.util.stream.ReduceOps$3ReducingSink.combine(ReduceOps.java:160) 

因此,根据这个结果,我们可以推断Collector's combiner只能通过并行执行来调用。