流状态计算:累积总和

假设我有一个Java IntStream,是否可以将它转换为具有累积总和的IntStream? 例如,以[4,2,6,…]开头的流应转换为[4,6,12,…]。

更一般地说,应该如何实现有状态流操作? 感觉这应该是可能的:

myIntStream.map(new Function { int sum = 0; Integer apply(Integer value){ return sum += value; } ); 

有明显的限制,这只适用于顺序流。 但是,Stream.map明确需要无状态映射函数。 我是否正确错过了Stream.statefulMap或Stream.cumulative操作,还是缺少Java流的重点?

比较一下Haskell,其中scanl1函数正好解决了这个例子:

 scanl1 (+) [1 2 3 4] = [1 3 6 10] 

你可以用primefaces序数做到这一点。 例如:

 import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import java.util.stream.LongStream; public class Accumulator { public static LongStream toCumulativeSumStream(IntStream ints){ AtomicLong sum = new AtomicLong(0); return ints.sequential().mapToLong(sum::addAndGet); } public static void main(String[] args){ LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5)); sums.forEachOrdered(System.out::println); } } 

这输出:

 1 3 6 10 

我使用Long来存储总和,因为完全有可能两个整数加起来远远超过Integer.MAX_VALUE ,并且long很少有溢出的可能性。

可以使用收集器然后创建新流:

 class Accumulator { public static void accept(List list, Integer value) { list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1))); } public static List combine(List list1, List list2) { int total = list1.get(list1.size() - 1); list2.stream().map(n -> n + total).forEach(list1::add); return list1; } } 

这用作:

 myIntStream.parallel() .collect(ArrayList::new, Accumulator::accept, Accumulator::combine) .stream(); 

希望你可以看到这个收集器的重要属性是,即使流是并行的,因为Accumulator实例被组合,它会调整总数。

这显然不如映射操作有效,因为它收集整个流然后生成新流。 但这不仅仅是一个实现细节:它是流可能同时处理的事实的必要function。

我用IntStream.range(0, 10000).parallel()测试了它,它正常运行。