遇到订单友好/不友好的终端操作与并行/顺序与有序/无序流

受这个问题的启发,我开始玩有序与无序流,并行与顺序流和终端操作,这些操作遵循不受其尊重的遭遇顺序与终端操作。

在对链接问题的一个答案中,显示了与此类似的代码:

List ordered = Arrays.asList( 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4); List result = new CopyOnWriteArrayList(); ordered.parallelStream().forEach(result::add); System.out.println(ordered); System.out.println(result); 

这些名单确实不同。 unordered列表甚至从一次运行变为另一次运行,表明结果实际上是非确定性的。

所以我创建了另一个例子:

 CopyOnWriteArrayList result2 = ordered.parallelStream() .unordered() .collect(Collectors.toCollection(CopyOnWriteArrayList::new)); System.out.println(ordered); System.out.println(result2); 

我希望看到类似的结果,因为流是并行和无序的(可能是unordered()是多余的,因为它已经是并行的)。 但是,生成的列表是有序的,即它等于源列表。

所以我的问题是为什么收集的清单是有序的? collect总是尊重遭遇顺序,即使对于平行,无序的流? 它是特定的Collectors.toCollection(...)收集器强制遭遇订单吗?

Collectors.toCollection返回一个缺少Collector.Characteristics.UNORDERED特征的Collector.Characteristics.UNORDERED 。 另一个指定Collector.Characteristics.UNORDERED可能表现不同。

这就是说:“无序”意味着没有保证 ,不能保证变化。 如果库发现最容易按顺序处理无序集合,则允许这样做,并且允许该行为在周二或者满月时将释放更改为释放。

(另请注意,如果您要使用并行流, Collectors.toCollection不要求您使用并发集合实现; toCollection(ArrayList::new)可以正常工作。这是因为收集器没有Collector.Characteristics.CONCURRENT特性,因此它使用一种集合策略,即使对于并行流,它也适用于非并发集合。)

如果你使用无序流但是收集器不是UNORDERED ,反之亦然,我怀疑你从框架得到任何保证。 如果有一张桌子,它就会说“这里的龙是没有行为的。” 我也期望在这里对不同类型的链接操作有一些区别,例如Eugene提到findFirst在这里有所不同,即使findFirst本质上是一个有序操作 – unordered().findFirst() findAny()变得等同于findAny()

对于Stream.collect ,我相信当前的实现有三种策略:

  • 顺序:启动一个累加器,将元素累积到其中(按照遭遇顺序,因为你为什么要打扰工作来洗牌元素?按照你得到它们的顺序接受它们),调用整理器。
  • 并行执行,并发收集器以及流或收集器是无序的:一个累加器,分片输入,工作线程处理每个分片中的元素,并在它们准备好时将元素添加到累加器,调用整理器。
  • 并行执行,其他任何事情:将输入分片为N个分片,每个分片按顺序累积到其自己的不同累加器中,累加器与组合器function结合,调用整理器。

当前的实现中,我检查了java-8和java-9,非并发收集器的collect阶段忽略了无序标志(未设置Collector.Characteristics.UNORDERED )。 允许实现,以某种方式类似的问题 。

在您链接的同一个问题中,我确实提供了findFirst如何从jdk-8 实际更改为jdk-9的示例。

文档Stream#collect已经提到:

当并行执行时,可以实例化填充合并多个中间结果,以便保持可变数据结构的隔离。 因此,即使与非线程安全数据结构(例如ArrayList)并行执行, 也不需要额外的同步来进行并行缩减。

这意味着Stream#collect做了两个主要的事情: splitmerge

但我在jdk-8中有一个特殊的例子,你可以获取不同的结果,:)。 当通过Stream#generate创建无序流时,您可以在Collectors#toList上获取不同的结果,例如:

 Set> result = IntStream.range(0, 10).mapToObj(__ -> { return unordered().parallel().collect(toSet()); }).collect(toSet()); assert result.each.size() == 100000; // ok // v--- surprised, it was pass assert result.size() > 1; 

 Stream unordered() { AtomicInteger counter = new AtomicInteger(); return Stream.generate(counter::getAndIncrement).limit(10000); }