为什么Files.list()并行流的执行速度比使用Collection.parallelStream()慢得多?
以下代码片段是获取目录列表的方法的一部分,在每个文件上调用extract方法并将生成的药物对象序列化为xml。
try(Stream paths = Files.list(infoDir)) { paths .parallel() .map(this::extract) .forEachOrdered(drug -> { try { marshaller.write(drug); } catch (JAXBException ex) { ex.printStackTrace(); } }); }
这是完全相同的完全相同的代码,但使用普通的.list()
调用来获取目录列表并在结果列表中调用.parallelStream()
。
Arrays.asList(infoDir.toFile().list()) .parallelStream() .map(f -> infoDir.resolve(f)) .map(this::extract) .forEachOrdered(drug -> { try { marshaller.write(drug); } catch (JAXBException ex) { ex.printStackTrace(); } });
我的机器是四核MacBook Pro,Java v 1.8.0_60(内置1.8.0_60-b27)。
我正在处理~7000个文件。 平均3次运行:
第一个版本:使用.parallel()
:20秒。 没有.parallel()
:41秒
第二个版本:使用.parallelStream()
:12秒。 使用.stream()
:41秒。
并行模式下的那8秒似乎是一个巨大的差异,因为从流中读取并执行所有繁重工作的extract
方法和执行最终写入的write
调用都没有改变。
问题是Stream API的当前实现以及IteratorSpliterator
的当前实现对于未知大小的源严重地将这些源拆分为并行任务。 你很幸运拥有超过1024个文件,否则你将没有任何并行化的好处。 当前Stream API实现考虑了Spliterator
返回的estimateSize()
值。 未知大小的IteratorSpliterator
在拆分之前返回Long.MAX_VALUE
,其后缀也始终返回Long.MAX_VALUE
。 它的分裂策略如下:
- 定义当前批次大小。 当前公式是从1024个元素开始并以算术方式(
MAX_BATCH
等)增加,直到达到MAX_BATCH
大小(即33554432个元素)。 - 将输入元素(在您的情况下为Paths)消耗到数组中,直到达到批处理大小或输入耗尽为止。
- 返回一个
ArraySpliterator
迭代创建的数组作为前缀,将自身ArraySpliterator
后缀。
假设你有7000个文件。 Stream API会询问估计的大小, IteratorSpliterator
返回Long.MAX_VALUE
。 好的,Stream API要求IteratorSpliterator
进行拆分,它会从底层DirectoryStream
收集1024个元素到数组,然后拆分到ArraySpliterator
(估计大小为1024)和它自己(估计大小仍为Long.MAX_VALUE
)。 由于Long.MAX_VALUE
远远超过1024,因此Stream API决定继续拆分较大的部分,甚至不试图拆分较小的部分。 所以整体分裂树是这样的:
IteratorSpliterator (est. MAX_VALUE elements) | | ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements) | | /---------------/ | | | ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements) | | /---------------/ | | | ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements) | | /---------------/ | | | ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements) | (split returns null: refuses to split anymore)
所以在那之后你要执行五个并行任务:实际上包含1024,2048,3072,856和0个元素。 请注意,即使最后一个块有0个元素,它仍然会报告它估计有Long.MAX_VALUE
元素,因此Stream API也会将它发送到ForkJoinPool
。 糟糕的是,Stream API认为前四个任务的进一步拆分是无用的,因为它们的估计大小要小得多。 所以你得到的是非常不均匀的输入分割,最大利用四个CPU内核(即使你有更多)。 如果每个元素的处理对于任何元素大致相同,那么整个过程将等待最大部分(3072个元素)完成。 所以最大加速可能是7000/3072 = 2.28x。 因此,如果顺序处理需要41秒,那么并行流将需要大约41 / 2.28 = 18秒(这接近您的实际数字)。
您的解决方案完全没问题。 请注意,使用Files.list().parallel()
还可以将所有输入Path
元素存储在内存中(在ArraySpliterator
对象中)。 因此,如果手动将它们转储到List
则不会浪费更多内存。 像ArrayList
(当前由Collectors.toList()
创建Collectors.toList()
数组支持的列表实现可以均匀地分割而没有任何问题,这导致额外的加速。
为什么这种情况没有优化? 当然,这不是一个不可能的问题(尽管实施可能非常棘手)。 对于JDK开发人员来说,这似乎不是高优先级问题。 在邮件列表中有关于此主题的几个讨论。 你可以在这里阅读Paul Sandoz的消息, 在那里他评论我的优化工作。
作为替代方案,您可以使用专为DirectoryStream
量身定制的自定义分割器:
public class DirectorySpliterator implements Spliterator { Iterator iterator; long est; private DirectorySpliterator(Iterator iterator, long est) { this.iterator = iterator; this.est = est; } @Override public boolean tryAdvance(Consumer super Path> action) { if (iterator == null) { return false; } Path path; try { synchronized (iterator) { if (!iterator.hasNext()) { iterator = null; return false; } path = iterator.next(); } } catch (DirectoryIteratorException e) { throw new UncheckedIOException(e.getCause()); } action.accept(path); return true; } @Override public Spliterator trySplit() { if (iterator == null || est == 1) return null; long e = this.est >>> 1; this.est -= e; return new DirectorySpliterator(iterator, e); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return DISTINCT | NONNULL; } public static Stream list(Path parent) throws IOException { DirectoryStream ds = Files.newDirectoryStream(parent); int splitSize = Runtime.getRuntime().availableProcessors() * 8; DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize); return StreamSupport.stream(spltr, false).onClose(() -> { try { ds.close(); } catch (IOException e) { throw new UncheckedIOException(e); } }); } }
只需用DirectorySpliterator.list
替换Files.list
,它将平行并行化,无需任何中间缓冲。 这里我们使用DirectoryStream
生成一个没有任何特定顺序的目录列表这一事实,因此每个并行线程只会从中获取一个后续条目(以同步方式,因为我们已经有了同步IO操作,额外的同步几乎没有任何开销)。 并行顺序每次都会不同(即使使用了forEachOrdered
),但Files.list()
也不保证顺序。
这里唯一不重要的部分是要创建多少并行任务。 由于我们在遍历它之前不知道文件夹中有多少文件,因此最好使用availableProcessors()
作为基础。 我创建了大约8 x availableProcessors()
处理器8 x availableProcessors()
单个任务,这似乎是一个很好的细粒度/粗粒度的折衷:如果每个元素处理不均匀,比处理器有更多的任务将有助于平衡负载。
解决方法的另一种替代方法是在流上使用.collect(Collectors.toList()).parallelStream()
try(Stream paths = Files.list(infoDir)) { paths .collect(Collectors.toList()) .parallelStream() .map(this::extract) .forEachOrdered(drug -> { try { marshaller.write(drug); } catch (JAXBException ex) { ex.printStackTrace(); } }); }
有了这个,你不需要调用.map(f -> infoDir.resolve(f))
,性能应该类似于你的第二个解决方案。