Java 8流处理不流畅

我遇到了Java 8流的问题,其中数据是在突然批量处理而不是在请求时处理的。 我有一个相当复杂的流 – 必须并行化,因为我使用concat合并两个流。

我的问题源于这样一个事实:数据似乎在大量分钟内解析 – 有时甚至是数小时。 我希望一旦Stream读取传入数据,就会发生这种处理,以分散工作负载。 批量处理几乎在所有方面都是违反直觉的。

所以,问题是为什么这个批量收集发生以及如何避免它。

我的输入是一个未知大小的Spliterator,我使用forEach作为终端操作。

并行流的基本原则是遭遇顺序不必与处理顺序匹配。 如果需要,这使得能够在组装正确排序的结果的同时处理子列表或子树的项目。 这明确允许批量处理,甚至使其成为有序流的并行处理的必需。

此行为由SpliteratortrySplit实现的特定实现决定。 规范说:

如果此Spliterator是ORDERED ,则返回的Spliterator必须覆盖元素的严格前缀

API注意:

理想的trySplit方法(无遍历)将其元素精确地分成两半,从而实现平衡并行计算。


为什么这个策略在规范中固定而不是,例如偶数/奇数分裂?

好吧,考虑一个简单的用例。 列表将被过滤并收集到新列表中,因此必须保留遭遇顺序。 使用前缀规则,它很容易实现。 拆分前缀,同时过滤两个块,然后将前缀过滤的结果添加到新列表,然后添加过滤后缀。

有了甚至奇怪的策略,这是不可能的。 您可以同时过滤这两个部分,但之后,除非在整个操作过程中跟踪每个项目的位置,否则您不知道如何正确加入结果。

即便如此,加入这些齿轮传动的物品要比每块addAll一个addAll要复杂得多。


如果您有可能必须保留的遭遇订单,您可能已经注意到这一切都只适用。 如果您的分裂器未报告ORDERED特征,则不需要返回前缀。 然而,您可能已经由AbstractSpliteratorinheritance的默认实现旨在与有序分裂器兼容。 因此,如果您想要一个不同的策略,您必须自己实现拆分操作。

或者您使用不同的方式来实现无序流,例如

 Stream.generate(()->{ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); return Thread.currentThread().getName(); }).parallel().forEach(System.out::println); 

可能更接近你的预期。