并行flatMap总是顺序的
假设我有这个代码:
Collections.singletonList(10) .parallelStream() // .stream() - nothing changes .flatMap(x -> Stream.iterate(0, i -> i + 1) .limit(x) .parallel() .peek(m -> { System.out.println(Thread.currentThread().getName()); })) .collect(Collectors.toSet());
输出是相同的线程名称,因此这里parallel
没有任何好处 – 我的意思是,有一个线程可以完成所有工作。
在flatMap
里面有这样的代码:
result.sequential().forEach(downstream);
我理解强制sequential
属性如果“外部”流是并行的(它们可能会阻塞),“外部”将不得不等待“flatMap”完成,反过来(因为使用相同的公共池)但是为什么总是强迫呢?
这是那些可能在以后的版本中发生变化的事情之一吗?
有两个不同的方面。
首先,只有一个管道是顺序的或并行的。 在内部流中选择顺序或并行是无关紧要的。 请注意,您在引用的代码段中看到的downstream
消费者代表整个后续流管道,因此在您的代码中,以.collect(Collectors.toSet());
结尾.collect(Collectors.toSet());
,这个消费者最终会将结果元素添加到一个非线程安全的Set
实例中。 因此,与该单个消费者并行处理内部流将破坏整个操作。
如果外部流被拆分,则引用的代码可能会与不同的消费者同时调用,从而添加到不同的集合中。 这些调用中的每一个都将处理外部流映射到不同内部流实例的不同元素。 由于外部流仅由单个元素组成,因此无法拆分。
这个方法已经实现,也是为什么flatMap()之后的filter()在Java流中“不完全”懒惰的原因? 问题,因为forEach
在内部流上调用,它将所有元素传递给下游消费者。 正如这个答案所certificate的那样,支持懒惰和子流分裂的替代实现是可能的。 但这是实现它的一种根本不同的方式。 Stream实现的当前设计主要由消费者组合工作,因此最后,源分裂器(以及从它分离出来的那些)接收一个Consumer
,它代表tryAdvance
或forEachRemaining
的整个流管道。 相比之下,链接答案的解决方案执行分裂器组合,生成一个新的Spliterator
委托给源分裂器。 我想,这两种方法都有优势,我不确定,OpenJDK实现在反过来工作时会失去多少。