限制和无序流的内部更改

基本上这是在试图回答另一个问题时出现的。 假设这段代码:

AtomicInteger i = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0); IntStream.generate(() -> i.incrementAndGet()) .parallel() .peek(x -> count.incrementAndGet()) .limit(5) .forEach(System.out::println); System.out.println("count = " + count); 

我理解IntStream#generate是一个无序的无限流 ,并且要完成它必须有一个短路操作(在这种情况下limit )。 我也理解,在达到该限制之前,Stream实现的次数可以自由调用。

在java-8下运行,打印count总是512 (可能并不总是,但在我的机器上是这样)。

在对比运行中,这在java-10下很少超过5 。 所以我的问题是内部发生了什么变化,短路发生得更好(我试图通过拥有源并试图做一些差异来解决这个问题……)

这种变化发生在Java 9,beta 103和Java 9,beta 120( JDK-8154387 )之间。

负责的类是StreamSpliterators.UnorderedSliceSpliterator.OfInt ,resp。 它的超类StreamSpliterators.UnorderedSliceSpliterator

这个类的旧版本看起来像

 abstract static class UnorderedSliceSpliterator> { static final int CHUNK_SIZE = 1 << 7; // The spliterator to slice protected final T_SPLITR s; protected final boolean unlimited; private final long skipThreshold; private final AtomicLong permits; UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { this.s = s; this.unlimited = limit < 0; this.skipThreshold = limit >= 0 ? limit : 0; this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); } UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator parent) { this.s = s; this.unlimited = parent.unlimited; this.permits = parent.permits; this.skipThreshold = parent.skipThreshold; } 

  @Override public void forEachRemaining(Consumer action) { Objects.requireNonNull(action); ArrayBuffer.OfRef sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { // Optimistically traverse elements up to a threshold of CHUNK_SIZE if (sb == null) sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE); else sb.reset(); long permitsRequested = 0; do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else { // Must be UNLIMITED; let 'er rip s.forEachRemaining(action); return; } } } 

我们可以看到,它试图在每个分裂器中缓冲多达CHUNK_SIZE = 1 << 7元素,这可能最终为“CPU核心数”×128个元素。

相比之下,新版本看起来像

 abstract static class UnorderedSliceSpliterator> { static final int CHUNK_SIZE = 1 << 7; // The spliterator to slice protected final T_SPLITR s; protected final boolean unlimited; protected final int chunkSize; private final long skipThreshold; private final AtomicLong permits; UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { this.s = s; this.unlimited = limit < 0; this.skipThreshold = limit >= 0 ? limit : 0; this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE, ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE; this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); } UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator parent) { this.s = s; this.unlimited = parent.unlimited; this.permits = parent.permits; this.skipThreshold = parent.skipThreshold; this.chunkSize = parent.chunkSize; } 

...

  @Override public void forEachRemaining(Consumer action) { Objects.requireNonNull(action); ArrayBuffer.OfRef sb = null; PermitStatus permitStatus; while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { if (permitStatus == PermitStatus.MAYBE_MORE) { // Optimistically traverse elements up to a threshold of chunkSize if (sb == null) sb = new ArrayBuffer.OfRef<>(chunkSize); else sb.reset(); long permitsRequested = 0; do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize); if (permitsRequested == 0) return; sb.forEach(action, acquirePermits(permitsRequested)); } else { // Must be UNLIMITED; let 'er rip s.forEachRemaining(action); return; } } } 

所以现在有一个实例字段chunkSize 。 当存在定义的限制并且表达式((skip + limit) / AbstractTask.LEAF_TARGET) + 1计算值小于CHUNK_SIZE ,将使用该较小的值。 因此,当具有小限制时, chunkSize将小得多。 在您的情况下,限制为5 ,块大小将始终为1