如何在Stream上短路reduce()操作?

这与如何在Stream上短路减少基本相同? 。 但是,由于该问题集中在一个布尔值流,并且其答案不能推广到其他类型并减少操作,我想问更一般的问题。

我们如何在流上进行减少,以便在遇到减少操作的吸收元件时使其短路?

乘法的典型数学例子是0。 这个Stream

 int product = IntStream.of(2, 3, 4, 5, 0, 7, 8) .reduce(1, (a, b) -> a * b); 

将消耗最后两个元素( 78 ),无论一旦遇到0 ,产品是已知的。

不幸的是,Stream API具有创建自己的短路操作的有限能力。 不太干净的解决方案是抛出RuntimeException并捕获它。 这是IntStream的实现,但它也可以推广到其他流类型:

 public static int reduceWithCancelEx(IntStream stream, int identity, IntBinaryOperator combiner, IntPredicate cancelCondition) { class CancelException extends RuntimeException { private final int val; CancelException(int val) { this.val = val; } } try { return stream.reduce(identity, (a, b) -> { int res = combiner.applyAsInt(a, b); if(cancelCondition.test(res)) throw new CancelException(res); return res; }); } catch (CancelException e) { return e.val; } } 

用法示例:

 int product = reduceWithCancelEx( IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 1, (a, b) -> a * b, val -> val == 0); System.out.println("Result: "+product); 

输出:

 2 3 4 5 0 Result: 0 

请注意,即使它适用于并行流,也不能保证其中一个并行任务会在其中一个引发exception时立即完成。 已经启动的子任务可能会一直运行到完成,因此您可以处理比预期更多的元素。

更新 :替代解决方案,更长,但更平行友好。 它基于自定义分裂器,它最多返回一个元素,这是所有底层元素积累的结果)。 在顺序模式下使用它时,它会在单个tryAdvance调用中完成所有工作。 拆分时,每个部件都会生成对应的单个部分结果,使用组合器function由Stream引擎减少。 这是通用版本,但原始专业化也是可能的。

 final static class CancellableReduceSpliterator implements Spliterator, Consumer, Cloneable { private Spliterator source; private final BiFunction accumulator; private final Predicate cancelPredicate; private final AtomicBoolean cancelled = new AtomicBoolean(); private A acc; CancellableReduceSpliterator(Spliterator source, A identity, BiFunction accumulator, Predicate cancelPredicate) { this.source = source; this.acc = identity; this.accumulator = accumulator; this.cancelPredicate = cancelPredicate; } @Override public boolean tryAdvance(Consumer action) { if (source == null || cancelled.get()) { source = null; return false; } while (!cancelled.get() && source.tryAdvance(this)) { if (cancelPredicate.test(acc)) { cancelled.set(true); break; } } source = null; action.accept(acc); return true; } @Override public void forEachRemaining(Consumer action) { tryAdvance(action); } @Override public Spliterator trySplit() { if(source == null || cancelled.get()) { source = null; return null; } Spliterator prefix = source.trySplit(); if (prefix == null) return null; try { @SuppressWarnings("unchecked") CancellableReduceSpliterator result = (CancellableReduceSpliterator) this.clone(); result.source = prefix; return result; } catch (CloneNotSupportedException e) { throw new InternalError(); } } @Override public long estimateSize() { // let's pretend we have the same number of elements // as the source, so the pipeline engine parallelize it in the same way return source == null ? 0 : source.estimateSize(); } @Override public int characteristics() { return source == null ? SIZED : source.characteristics() & ORDERED; } @Override public void accept(T t) { this.acc = accumulator.apply(this.acc, t); } } 

类似于Stream.reduce(identity, accumulator, combiner)Stream.reduce(identity, combiner) ,但是使用cancelPredicate

 public static  U reduceWithCancel(Stream stream, U identity, BiFunction accumulator, BinaryOperator combiner, Predicate cancelPredicate) { return StreamSupport .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity, accumulator, cancelPredicate), stream.isParallel()).reduce(combiner) .orElse(identity); } public static  T reduceWithCancel(Stream stream, T identity, BinaryOperator combiner, Predicate cancelPredicate) { return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate); } 

让我们测试两个版本并计算实际处理的元素数量。 让我们把0接近结束。 例外版本:

 AtomicInteger count = new AtomicInteger(); int product = reduceWithCancelEx( IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) .parallel().peek(i -> count.incrementAndGet()), 1, (a, b) -> a * b, x -> x == 0); System.out.println("product: " + product + "/count: " + count); Thread.sleep(1000); System.out.println("product: " + product + "/count: " + count); 

典型输出:

 product: 0/count: 281721 product: 0/count: 500001 

因此,当仅处理某些元素时返回结果时,任务继续在后台工作,并且计数器仍在增加。 这是分裂器版本:

 AtomicInteger count = new AtomicInteger(); int product = reduceWithCancel( IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) .parallel().peek(i -> count.incrementAndGet()).boxed(), 1, (a, b) -> a * b, x -> x == 0); System.out.println("product: " + product + "/count: " + count); Thread.sleep(1000); System.out.println("product: " + product + "/count: " + count); 

典型输出:

 product: 0/count: 281353 product: 0/count: 281353 

返回结果时,所有任务实际上都已完成。

可以使用流的分离器来实现一般的短路静态降低方法。 它甚至变得不复杂! 当人们希望以更灵活的方式使用蒸汽时,使用分裂器似乎是很多次。

 public static  T reduceWithCancel(Stream s, T acc, BinaryOperator op, Predicate cancelPred) { BoxConsumer box = new BoxConsumer(); Spliterator splitr = s.spliterator(); while (!cancelPred.test(acc) && splitr.tryAdvance(box)) { acc = op.apply(acc, box.value); } return acc; } public static class BoxConsumer implements Consumer { T value = null; public void accept(T t) { value = t; } } 

用法:

  int product = reduceWithCancel( Stream.of(1, 2, 0, 3, 4).peek(System.out::println), 1, (acc, i) -> acc * i, i -> i == 0); System.out.println("Result: " + product); 

输出:

 1 2 0 Result: 0 

该方法可以推广到执行其他类型的终端操作。

这是基于这个关于一个接管操作的答案 。

我对此并行化潜力一无所知。

我自己的看法是不使用reduce()本身,而是使用现有的短路最终操作。

当使用具有副作用的谓词时,可以使用noneMatch()或allMatch()。 诚然,也不是最干净的解决方案,但确实达到了目标:

 AtomicInteger product = new AtomicInteger(1); IntStream.of(2, 3, 4, 5, 0, 7, 8) .peek(System.out::println) .noneMatch(i -> { if (i == 0) { product.set(0); return true; } int oldValue = product.get(); while (oldValue != 0 && !product.compareAndSet(oldValue, i * oldValue)) { oldValue = product.get(); } return oldValue == 0; }); System.out.println("Result: " + product.get()); 

它短路并且可以并联。