流状态计算:累积总和
假设我有一个Java IntStream,是否可以将它转换为具有累积总和的IntStream? 例如,以[4,2,6,…]开头的流应转换为[4,6,12,…]。
更一般地说,应该如何实现有状态流操作? 感觉这应该是可能的:
myIntStream.map(new Function { int sum = 0; Integer apply(Integer value){ return sum += value; } );
有明显的限制,这只适用于顺序流。 但是,Stream.map明确需要无状态映射函数。 我是否正确错过了Stream.statefulMap或Stream.cumulative操作,还是缺少Java流的重点?
比较一下Haskell,其中scanl1函数正好解决了这个例子:
scanl1 (+) [1 2 3 4] = [1 3 6 10]
你可以用primefaces序数做到这一点。 例如:
import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import java.util.stream.LongStream; public class Accumulator { public static LongStream toCumulativeSumStream(IntStream ints){ AtomicLong sum = new AtomicLong(0); return ints.sequential().mapToLong(sum::addAndGet); } public static void main(String[] args){ LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5)); sums.forEachOrdered(System.out::println); } }
这输出:
1 3 6 10
我使用Long来存储总和,因为完全有可能两个整数加起来远远超过Integer.MAX_VALUE
,并且long很少有溢出的可能性。
可以使用收集器然后创建新流:
class Accumulator { public static void accept(List list, Integer value) { list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1))); } public static List combine(List list1, List list2) { int total = list1.get(list1.size() - 1); list2.stream().map(n -> n + total).forEach(list1::add); return list1; } }
这用作:
myIntStream.parallel() .collect(ArrayList::new, Accumulator::accept, Accumulator::combine) .stream();
希望你可以看到这个收集器的重要属性是,即使流是并行的,因为Accumulator
实例被组合,它会调整总数。
这显然不如映射操作有效,因为它收集整个流然后生成新流。 但这不仅仅是一个实现细节:它是流可能同时处理的事实的必要function。
我用IntStream.range(0, 10000).parallel()
测试了它,它正常运行。