遇到订单友好/不友好的终端操作与并行/顺序与有序/无序流
受这个问题的启发,我开始玩有序与无序流,并行与顺序流和终端操作,这些操作遵循不受其尊重的遭遇顺序与终端操作。
在对链接问题的一个答案中,显示了与此类似的代码:
List ordered = Arrays.asList( 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4); List result = new CopyOnWriteArrayList(); ordered.parallelStream().forEach(result::add); System.out.println(ordered); System.out.println(result);
这些名单确实不同。 unordered
列表甚至从一次运行变为另一次运行,表明结果实际上是非确定性的。
所以我创建了另一个例子:
CopyOnWriteArrayList result2 = ordered.parallelStream() .unordered() .collect(Collectors.toCollection(CopyOnWriteArrayList::new)); System.out.println(ordered); System.out.println(result2);
我希望看到类似的结果,因为流是并行和无序的(可能是unordered()
是多余的,因为它已经是并行的)。 但是,生成的列表是有序的,即它等于源列表。
所以我的问题是为什么收集的清单是有序的? collect
总是尊重遭遇顺序,即使对于平行,无序的流? 它是特定的Collectors.toCollection(...)
收集器强制遭遇订单吗?
Collectors.toCollection
返回一个缺少Collector.Characteristics.UNORDERED
特征的Collector.Characteristics.UNORDERED
。 另一个指定Collector.Characteristics.UNORDERED
可能表现不同。
这就是说:“无序”意味着没有保证 ,不能保证变化。 如果库发现最容易按顺序处理无序集合,则允许这样做,并且允许该行为在周二或者满月时将释放更改为释放。
(另请注意,如果您要使用并行流, Collectors.toCollection
不要求您使用并发集合实现; toCollection(ArrayList::new)
可以正常工作。这是因为收集器没有Collector.Characteristics.CONCURRENT
特性,因此它使用一种集合策略,即使对于并行流,它也适用于非并发集合。)
如果你使用无序流但是收集器不是UNORDERED
,反之亦然,我怀疑你从框架得到任何保证。 如果有一张桌子,它就会说“这里的龙是没有行为的。” 我也期望在这里对不同类型的链接操作有一些区别,例如Eugene提到findFirst
在这里有所不同,即使findFirst
本质上是一个有序操作 – unordered().findFirst()
findAny()
变得等同于findAny()
。
对于Stream.collect
,我相信当前的实现有三种策略:
- 顺序:启动一个累加器,将元素累积到其中(按照遭遇顺序,因为你为什么要打扰工作来洗牌元素?按照你得到它们的顺序接受它们),调用整理器。
- 并行执行,并发收集器以及流或收集器是无序的:一个累加器,分片输入,工作线程处理每个分片中的元素,并在它们准备好时将元素添加到累加器,调用整理器。
- 并行执行,其他任何事情:将输入分片为N个分片,每个分片按顺序累积到其自己的不同累加器中,累加器与组合器function结合,调用整理器。
在当前的实现中,我检查了java-8和java-9,非并发收集器的collect
阶段忽略了无序标志(未设置Collector.Characteristics.UNORDERED
)。 允许实现,以某种方式类似的问题 。
在您链接的同一个问题中,我确实提供了findFirst
如何从jdk-8 实际更改为jdk-9的示例。
文档Stream#collect已经提到:
当并行执行时,可以实例化 , 填充和合并多个中间结果,以便保持可变数据结构的隔离。 因此,即使与非线程安全数据结构(例如ArrayList)并行执行, 也不需要额外的同步来进行并行缩减。
这意味着Stream#collect
做了两个主要的事情: split
和merge
。
但我在jdk-8中有一个特殊的例子,你可以获取不同的结果,:)。 当通过Stream#generate创建无序流时,您可以在Collectors#toList
上获取不同的结果,例如:
Set> result = IntStream.range(0, 10).mapToObj(__ -> { return unordered().parallel().collect(toSet()); }).collect(toSet()); assert result.each.size() == 100000; // ok // v--- surprised, it was pass assert result.size() > 1;
Stream unordered() { AtomicInteger counter = new AtomicInteger(); return Stream.generate(counter::getAndIncrement).limit(10000); }