通过鉴别器function对流进行分区

Streams API中缺少的function之一是“分区依据”转换,例如Clojure中定义的。 假设我想重现Hibernate的fetch join :我想发出一个SQL SELECT语句来从结果中接收这种对象:

class Family { String surname; List members; } 

我发出:

 SELECT f.name, m.name FROM Family f JOIN Member m on m.family_id = f.id ORDER BY f.name 

我检索(f.name, m.name)记录的(f.name, m.name) 。 现在我需要将它转换为Family对象流,并在其中包含其成员列表。 假设我已经有一个Stream ; 现在我需要将其转换为Stream<List> ,然后使用映射转换对其进行操作,将其转换为Stream

转换的语义如下:只要提供的鉴别器函数保持返回相同的值,就继续将流收集到List中; 一旦值更改,将List作为输出流的元素发出并开始收集新的List

我希望能够编写这种代码(我已经有了resultStream方法):

 Stream dbStream = resultStream(queryBuilder.createQuery( "SELECT f.name, m.name" + " FROM Family f JOIN Member m on m.family_id = f.id" + " ORDER BY f.name")); Stream<List partitioned = partitionBy(r -> r.string(0), dbStream); Stream = partitioned.map(rs -> { Family f = new Family(rs.get(0).string(0)); f.members = rs.stream().map(r -> r.string(1)).collect(toList()); return f; }); 

不用说,我希望结果流保持延迟(非物化),因为我希望能够处理任何大小的结果集而不会达到任何O(n)内存限制。 如果没有这个关键要求,我会对提供的groupingBy收集器感到满意。

该解决方案要求我们定义一个可用于构造分区流的自定义Spliterator 。 我们需要通过自己的spliterator访问输入流并将其包装到我们的。 然后从我们的自定义分裂器构造输出流。

如果Function作为鉴别器函数,则以下Spliterator将任何Stream转换为Stream> 。 请注意,必须为此操作订购输入流才有意义。

 public class PartitionBySpliterator extends AbstractSpliterator> { private final Spliterator spliterator; private final Function partitionBy; private HoldingConsumer holder; private Comparator> comparator; public PartitionBySpliterator(Spliterator toWrap, Function partitionBy) { super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL); this.spliterator = toWrap; this.partitionBy = partitionBy; } public static  Stream> partitionBy(Function partitionBy, Stream in) { return StreamSupport.stream(new PartitionBySpliterator<>(in.spliterator(), partitionBy), false); } @Override public boolean tryAdvance(Consumer> action) { final HoldingConsumer h; if (holder == null) { h = new HoldingConsumer<>(); if (!spliterator.tryAdvance(h)) return false; holder = h; } else h = holder; final ArrayList partition = new ArrayList<>(); final Object partitionKey = partitionBy.apply(h.value); boolean didAdvance; do partition.add(h.value); while ((didAdvance = spliterator.tryAdvance(h)) && Objects.equals(partitionBy.apply(h.value), partitionKey)); if (!didAdvance) holder = null; action.accept(partition); return true; } static final class HoldingConsumer implements Consumer { T value; @Override public void accept(T value) { this.value = value; } } @Override public Comparator> getComparator() { final Comparator> c = this.comparator; return c != null? c : (this.comparator = comparator()); } private Comparator> comparator() { @SuppressWarnings({"unchecked","rawtypes"}) final Comparator innerComparator = Optional.ofNullable(spliterator.getComparator()) .orElse((Comparator) naturalOrder()); return (left, right) -> { final int c = innerComparator.compare(left.get(0), right.get(0)); return c != 0? c : innerComparator.compare( left.get(left.size() - 1), right.get(right.size() - 1)); }; } } 

对于那些只想对流进行分区的人,可以使用映射器和收集器。

 class Person { String surname; String forename; public Person(String surname, String forename) { this.surname = surname; this.forename = forename; } @Override public String toString() { return forename; } } class Family { String surname; List members; public Family(String surname, List members) { this.surname = surname; this.members = members; } @Override public String toString() { return "Family{" + "surname=" + surname + ", members=" + members + '}'; } } private void test() { String[][] data = { {"Kray", "Ronald"}, {"Kray", "Reginald"}, {"Dors", "Diana"},}; // Their families. Stream families = Arrays.stream(data) // Build people .map(a -> new Person(a[0], a[1])) // Collect into a Map> as families .collect(Collectors.groupingBy(p -> p.surname)) // Convert them to families. .entrySet().stream() .map(p -> new Family(p.getKey(), p.getValue())); families.forEach(f -> System.out.println(f)); } 

它可以通过StreamEx collapse来完成

 StreamEx.of(queryBuilder.createQuery( "SELECT f.name, m.name" + " FROM Family f JOIN Member m on m.family_id = f.id" + " ORDER BY f.name")) .collapse((a, b) -> a.string(0).equals(b.string(0)), Collectors.toList()) .map(l -> new Family(l.get(0).string(0), StreamEx.of(l).map(r -> r.string(1)).toList())) .forEach(System.out::println);