在一次通过中执行多次减少
在单个流传递中执行多个减少的习惯用法是什么? 是否只有一个大的减速器类,即使这违反了SRP,如果需要多种类型的减少计算?
大概你想避免多次通过,因为管道阶段可能很昂贵。 或者您希望避免收集中间值以便通过多个收集器运行它们,因为存储所有值的成本可能太高。
正如Brian Goetz所指出的 , Collectors.summarizingInt
将收集int
值并对它们执行多次约简,返回一个名为IntSummaryStatistics
的聚合结构。 有类似的收集器用于汇总double
和long
值。
不幸的是,它们只执行一组固定的减少,所以如果你想减少与它们不同的减少,你必须编写自己的收集器。
这是一种在一次通过中使用多个不相关的收集器的技术。 我们可以使用peek()
对流经流中的每个值进行破解,使其通过不受干扰。 peek()
操作需要一个Consumer
,因此我们需要一种方法来使Collector
适应Consumer
。 Consumer
将成为Collector的累加器函数。 但是我们还需要调用收集器的供应商函数并存储它创建的对象以传递给累加器函数。 我们需要一种方法将结果从收集器中取出。 为此,我们将收集器包装在一个小帮助器类中:
public class PeekingCollector { final Collector collector; final A acc; public PeekingCollector(Collector collector) { this.collector = collector; this.acc = collector.supplier().get(); } public Consumer peek() { if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) return t -> collector.accumulator().accept(acc, t); else return t -> { synchronized (this) { collector.accumulator().accept(acc, t); } }; } public synchronized R get() { return collector.finisher().apply(acc); } }
要使用它,我们首先必须创建包装的收集器并挂起它。 然后我们运行管道并调用peek
,传递包装的收集器。 最后,我们调用包装的收集器来获取结果。 这是一个简单的例子,用于过滤和排序某些单词,同时也按首字母对它们进行分组:
List input = Arrays.asList( "aardvark", "crocodile", "antelope", "buffalo", "bustard", "cockatoo", "capybara", "bison", "alligator"); PeekingCollector>> grouper = new PeekingCollector<>(groupingBy(s -> s.substring(0, 1))); List output = input.stream() .filter(s -> s.length() > 5) .peek(grouper.peek()) .sorted() .collect(toList()); Map> groups = grouper.get(); System.out.println(output); System.out.println(groups);
输出是:
[aardvark, alligator, antelope, buffalo, bustard, capybara, cockatoo, crocodile] {a=[aardvark, antelope, alligator], b=[buffalo, bustard], c=[crocodile, cockatoo, capybara]}
这有点麻烦,因为你必须写出包装收集器的generics类型(这有点不寻常;它们通常都是推断的)。 但是,如果处理或存储流值的费用足够大,也许值得麻烦。
最后请注意,如果流并行运行,则可以从多个线程调用peek()
。 因此,非线程安全的收集器必须由synchronized
块保护。 如果收集器是线程安全的,我们不需要在调用它时进行同步。 为了确定这一点,我们检查收集器的CONCURRENT
特性。 如果您运行并行流,则最好在peek
操作中放置并发收集器(例如groupingByConcurrent
或toConcurrentMap
),否则包装收集器内的同步可能会导致瓶颈并使整个流速变慢。