Reader#lines()由于其spliterator中的不可配置的批量大小策略而严重并行化
当流源是Reader
时,我无法实现流处理的良好并行化。 在四核CPU上运行下面的代码我首先观察到3个核心,然后突然下降到两个核心,然后是一个核心。 整体CPU利用率徘徊在50%左右。
请注意示例的以下特征:
- 只有6,000行;
- 每条线需要大约20毫秒来处理;
- 整个过程大约需要一分钟。
这意味着所有压力都在CPU上,I / O很小。 这个例子是一个用于自动并行化的坐鸭。
import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; ... class imports elided ... public class Main { static final AtomicLong totalTime = new AtomicLong(); public static void main(String[] args) throws IOException { final long start = System.nanoTime(); final Path inputPath = createInput(); System.out.println("Start processing"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) { Files.lines(inputPath).parallel().map(Main::processLine) .forEach(w::println); } final double cpuTime = totalTime.get(), realTime = System.nanoTime()-start; final int cores = Runtime.getRuntime().availableProcessors(); System.out.println(" Cores: " + cores); System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1)); System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1)); System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores); } private static String processLine(String line) { final long localStart = System.nanoTime(); double ret = 0; for (int i = 0; i < line.length(); i++) for (int j = 0; j < line.length(); j++) ret += Math.pow(line.charAt(i), line.charAt(j)/32.0); final long took = System.nanoTime()-localStart; totalTime.getAndAdd(took); return NANOSECONDS.toMillis(took) + " " + ret; } private static Path createInput() throws IOException { final Path inputPath = Paths.get("input.txt"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) { for (int i = 0; i < 6_000; i++) { final String text = String.valueOf(System.nanoTime()); for (int j = 0; j < 25; j++) w.print(text); w.println(); } } return inputPath; } }
我的典型输出:
Cores: 4 CPU time: 110.23 s Real time: 53.60 s CPU utilization: 51.41%
为了比较,如果我使用稍微修改的变体,我首先收集到列表然后处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine) .forEach(w::println);
我得到这个典型的输出:
Cores: 4 CPU time: 138.43 s Real time: 35.00 s CPU utilization: 98.87%
有什么可以解释这种影响,我该如何解决它以获得充分利用?
请注意,我最初在servlet输入流的读者上观察到这一点,因此它并不特定于FileReader
。
这是答案,在Spliterators.IteratorSpliterator
的源代码中Spliterators.IteratorSpliterator
, BufferedReader#lines()
使用的代码:
@Override public Spliterator trySplit() { /* * Split into arrays of arithmetically increasing batch * sizes. This will only improve parallel performance if * per-element Consumer actions are more costly than * transferring them into an array. The use of an * arithmetic progression in split sizes provides overhead * vs parallelism bounds that do not particularly favor or * penalize cases of lightweight vs heavyweight element * operations, across combinations of #elements vs #cores, * whether or not either are known. We generate * O(sqrt(#elements)) splits, allowing O(sqrt(#cores)) * potential speedup. */ Iterator extends T> i; long s; if ((i = it) == null) { i = it = collection.iterator(); s = est = (long) collection.size(); } else s = est; if (s > 1 && i.hasNext()) { int n = batch + BATCH_UNIT; if (n > s) n = (int) s; if (n > MAX_BATCH) n = MAX_BATCH; Object[] a = new Object[n]; int j = 0; do { a[j] = i.next(); } while (++j < n && i.hasNext()); batch = j; if (est != Long.MAX_VALUE) est -= j; return new ArraySpliterator<>(a, 0, j, characteristics); } return null; }
同样值得注意的是常数:
static final int BATCH_UNIT = 1 << 10; // batch array size increment static final int MAX_BATCH = 1 << 25; // max batch array size;
因此,在我的示例中,我使用了6,000个元素,因为批量大小步长为1024,所以我只获得了三个批次。这正好解释了我的观察结果,最初使用了三个核心,在较小的批次完成时降至两个然后一个。 与此同时,我尝试了一个包含60,000个元素的修改示例,然后我获得了几乎100%的CPU利用率。
为了解决我的问题,我开发了下面的代码,它允许我将任何现有的流转换为Spliterator#trySplit
将其分割成指定大小的批次的流。 从我的问题中将它用于用例的最简单方法是这样的:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
在较低级别,下面的类是spliterator包装器,它更改包装的spliterator的trySplit
行为并保持其他方面不变。
import static java.util.Spliterators.spliterator; import static java.util.stream.StreamSupport.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliteratorWrapper implements Spliterator { private final Spliterator spliterator; private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorWrapper(Spliterator toWrap, long est, int batchSize) { final int c = toWrap.characteristics(); this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c; this.spliterator = toWrap; this.est = est; this.batchSize = batchSize; } public FixedBatchSpliteratorWrapper(Spliterator toWrap, int batchSize) { this(toWrap, toWrap.estimateSize(), batchSize); } public static Stream toFixedBatchStream(Stream in, int batchSize) { return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true); } @Override public Spliterator trySplit() { final HoldingConsumer holder = new HoldingConsumer<>(); if (!spliterator.tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a, 0, j, characteristics()); } @Override public boolean tryAdvance(Consumer super T> action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer super T> action) { spliterator.forEachRemaining(action); } @Override public Comparator super T> getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer implements Consumer { Object value; @Override public void accept(T value) { this.value = value; } } }
在Java-9早期访问版本中,此问题在某种程度上已得到修复。 Files.lines
被重写,现在在拆分时它实际上跳转到内存映射文件的中间。 这是我的机器上的结果(它有4个HyperThreading核心= 8个硬件线程):
Java 8u60:
Start processing Cores: 8 CPU time: 73,50 s Real time: 36,54 s CPU utilization: 25,15%
Java 9b82:
Start processing Cores: 8 CPU time: 79,64 s Real time: 10,48 s CPU utilization: 94,95%
如您所见,实时和CPU利用率都得到了极大的提高。
这种优化有一些局限性。 目前它仅适用于几种编码( 即 UTF-8,ISO_8859_1和US_ASCII),对于任意编码,您不确切知道如何编码换行符。 它仅限于不超过2Gb大小的文件(由于Java中MappedByteBuffer
限制),当然不适用于某些非常规文件(如字符设备,无法进行内存映射的命名管道)。 在这种情况下,旧实现用作后备。
流的并行执行基于fork-join模型。 对于有序流 ,并行执行仅在流可以拆分为多个部分时严格相互跟随才有效。 通常,使用BufferedReader生成的流是不可能的。 但是,从理论上讲,对于无序流,应该可以并行执行:
BufferedReader reader = ...; reader.lines().unordered().map(...);
我不确定BufferedReader返回的流是否支持这种并行执行。 一个非常简单的选择是创建一个中间列表:
BufferedReader reader = ...; reader.lines().collect(toList()).parallelStream().map(...);
在这种情况下,并行执行在读取所有行之后开始。 如果读取线条需要很长时间,这可能是一个问题。 在这种情况下,我建议使用ExecutorService进行并行执行而不是并行流 :
ExecutorService executor = ...; BufferedReader reader = ...; reader.lines() .map(line -> executor.submit(() -> ... line ...)) .collect(toList()) .stream() .map(future -> future.get()) .map(...);
要找到这个的真正原因,您需要深入了解调用BufferedReader.lines()
的Files.lines()
源代码,该代码如下:
public Stream lines() { Iterator iter = new Iterator () { String nextLine = null; @Override public boolean hasNext() { if (nextLine != null) { return true; } else { try { nextLine = readLine(); return (nextLine != null); } catch (IOException e) { throw new UncheckedIOException(e); } } } @Override public String next() { if (nextLine != null || hasNext()) { String line = nextLine; nextLine = null; return line; } else { throw new NoSuchElementException(); } } }; return StreamSupport.stream(Spliterators.spliteratorUnknownSize( iter, Spliterator.ORDERED | Spliterator.NONNULL), false); }
这里它返回一个Stream
,它是:
- 大小不明
- 有序
- 不是空的
- 不平行(
StreamSupport.stream()
末尾的false
参数
因此我真的不确定它是否可以被平行化,这可以通过进一步挖掘源来找到。
我所知道的是Java API中明确提供了并行流。 以List
为例,它有一个List.stream()
和List.parallelStream()
方法。