在并行流上调用顺序会使所有先前的操作顺序进行

我有一个重要的数据集,并希望调用缓慢但干净的方法,而不是调用带有副作用的快速方法对第一个结果。 我对中间结果不感兴趣,所以我不想收集它们。

显而易见的解决方案是创建并行流,进行慢速呼叫,再次使流顺序,并进行快速呼叫。 问题是,所有代码都在单线程中执行,没有实际的并行性。

示例代码:

@Test public void testParallelStream() throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2); Set threads = forkJoinPool.submit(()-> new Random().ints(100).boxed() .parallel() .map(this::slowOperation) .sequential() .map(Function.identity())//some fast operation, but must be in single thread .collect(Collectors.toSet()) ).get(); System.out.println(threads); Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size()); } private String slowOperation(int value) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return Thread.currentThread().getName(); } 

如果我删除sequential ,代码按预期执行,但显然,非并行操作将在多个线程中调用。

你能推荐一些关于这种行为的参考资料,或者某些方法可以避免临时收集吗?

将流从parallel()切换到sequential()在初始Stream API设计中工作,但是引起了许多问题,最后实现被更改 ,因此它只是为整个管道打开和关闭并行标志。 当前的文档确实含糊不清,但在Java-9中有所改进:

根据调用终端操作的流的模式,顺序地或并行地执行流管道。 可以使用BaseStream.isParallel()方法确定流的顺序或并行模式,并且可以使用BaseStream.sequential()BaseStream.parallel()操作修改流的模式。 最近的顺序或并行模式设置适用于整个流管道的执行。

至于您的问题,您可以将所有内容收集到中间List并启动新的顺序管道:

 new Random().ints(100).boxed() .parallel() .map(this::slowOperation) .collect(Collectors.toList()) // Start new stream here .stream() .map(Function.identity())//some fast operation, but must be in single thread .collect(Collectors.toSet()); 

在当前实现中,Stream是全部并行或全部是顺序的。 虽然Javadoc没有明确说明这一点,但未来它可能会发生变化,但它确实说这是可能的。

S parallel()

返回并行的等效流。 可能会返回自己,因为流已经并行,或者因为基础流状态被修改为并行。

如果您需要单线程函数,我建议您使用Lock或synchronized块/方法。