Java中无限流的并行处理

为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?

IntStream.iterate(0, i -> ( i + 1 ) % 2) .parallel() .distinct() .limit(10) .forEach(System.out::println); 

虽然我知道理想的限制应该放在不同之前,但我的问题更多地与添加并行处理引起的差异有关。

真正的原因是有序并行 .distinct()是完整的屏障操作, 如文档中所述:

保持并行管道中distinct()稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性。

“全屏障操作”意味着必须在下游开始之前执行所有上游操作。 Stream API中只有两个完整的屏障操作: .sorted() (每次)和.distinct() (按顺序并行的情况)。 由于你有非短路的无限流提供给.distinct()你最终会得到无限循环。 通过契约.distinct()不能以任何顺序向下游发射元素:它应该总是发出第一个重复元素。 虽然理论上可以更好地实现并行有序.distinct() ,但实现起来要复杂得多。

至于解决方案,@ user140547是正确的:在.unordered()之前添加.unordered().distinct() distinct()算法切换为无序的( .distinct()使用共享的ConcurrentHashMap来存储所有观察到的元素并将每个新元素发送到下游)。 请注意,在.unordered() 之后添加.unordered() .distinct()将无济于事。

Stream.iterate返回’无限顺序有序流’。 因此,使顺序流并行不是太有用。

根据Stream包的描述:

对于并行流,放宽排序约束有时可以实现更高效的执行。 如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复(distinct())或分组缩减(Collectors.groupingBy())。 类似地,与遇到订单本质上相关的操作(例如limit())可能需要缓冲以确保正确排序,从而破坏并行性的好处。 在流具有遭遇顺序但用户不特别关心该遭遇顺序的情况下,使用无序()明确地对流进行排序可以改善某些有状态或终端操作的并行性能。 然而,大多数流管道,例如上面的“块的权重总和”示例,即使在排序约束下仍然有效地并行化。

在您的情况下似乎是这种情况,使用无序(),它打印0,1。

  IntStream.iterate(0, i -> (i + 1) % 2) .parallel() .unordered() .distinct() .limit(10) .forEach(System.out::println); 

我知道代码不正确,并且在解决方案中也是如此,如果我们在不同之前移动限制,我们就不会有无限循环。

并行函数使用fork和join概念来分配工作,它为工作分配所有可用的线程,而不是单个线程。

我们正确地期待无限循环,因为multithreading无限地处理数据并且没有什么能阻止它们,因为10的限制永远不会在明显之后发生。

它可能会继续尝试fork并且永远不会尝试加入以向前移动它。 但我仍然相信它在java中的缺陷比其他任何东西都要多。

这个代码有一个主要问题,即使没有并行:在.distinct()之后,流只有2个元素 – 因此限制永远不会踢 – 它会打印两个元素然后继续浪费你的CPU时间无限期。 这可能是你想要的。

在平行和限制的情况下,我认为由于工作分工的方式,问题更加严重。 我没有完全跟踪并行流代码,但这是我的猜测:

并行代码在多个线程之间划分工作,这些线程都无限期地突然显示,因为它们从未填充配额。 系统可能会等待每个线程完成,因此它可以将它们的结果组合起来以确保顺序的清晰度 – 但是在您提供的情况下这种情况永远不会发生。

如果没有订单要求,则可以在针对全局清晰度集进行检查后立即使用每个工作线程的结果。

没有限制,我怀疑使用不同的代码来处理无限流:而不是等待所需的10填充,结果报告为已发现。 它有点像制作报告hasNext()= true的迭代器,首先产生0,然后是1,然后next()调用永久挂起而不产生结果 – 在并行情况下,某些东西正在等待多个报告,因此它可以正确组合/在输出之前对它们进行排序,而在串行情况下,它会执行它可以挂起的内容。

我试图找到有和没有distinct()或limit()的调用堆栈的确切差异,但到目前为止,似乎很难导航相当复杂的流库调用序列。