是否有可能在java 8中执行一个惰性groupby,返回一个流?

我有一些大型的文本文件,我想通过对其行进行分组来处理。

我尝试使用新的流媒体function,比如

return FileUtils.readLines(...) .parallelStream() .map(...) .collect(groupingBy(pair -> pair[0])); 

问题是,AFAIK,这会生成一个Map。

有没有办法像上面那样产生高级代码,例如,一个条目流?

更新 :我正在寻找的是像python的itertools.groupby 。 我的文件已经排序(通过pair [0]),我只想逐个加载组。

我已经有了一个迭代解决方案。 我只是想知道是否有更多的声明方式来做到这一点。 顺便说一句,使用番石榴或其他第三方图书馆不会是一个大问题。

您想要实现的任务与分组的任务完全不同。 groupingBy不依赖于Stream元素的顺序,而是依赖于Map的算法应用于分类器Function的结果。

您想要的是将具有公共属性值的相邻项折叠到一个List项中。 只要您可以保证具有相同属性值的所有项都被聚类,甚至不必将Stream按该属性排序。

也许有可能将此任务表示为减少,但对我来说,结果结构看起来太复杂了。

因此,除非将对此function的直接支持添加到Stream ,否则基于迭代器的方法对我来说看起来最实用:

 class Folding implements Spliterator>> { static  Stream>> foldBy( Stream s, Function f) { return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); } private final Spliterator source; private final Function pf; private final Consumer c=this::addItem; private List pending, result; private G pendingGroup, resultGroup; Folding(Spliterator s, Function f) { source=s; pf=f; } private void addItem(T item) { G group=pf.apply(item); if(pending==null) pending=new ArrayList<>(); else if(!pending.isEmpty()) { if(!Objects.equals(group, pendingGroup)) { if(pending.size()==1) result=Collections.singletonList(pending.remove(0)); else { result=pending; pending=new ArrayList<>(); } resultGroup=pendingGroup; } } pendingGroup=group; pending.add(item); } public boolean tryAdvance(Consumer>> action) { while(source.tryAdvance(c)) { if(result!=null) { action.accept(entry(resultGroup, result)); result=null; return true; } } if(pending!=null) { action.accept(entry(pendingGroup, pending)); pending=null; return true; } return false; } private Map.Entry> entry(G g, List l) { return new AbstractMap.SimpleImmutableEntry<>(g, l); } public int characteristics() { return 0; } public long estimateSize() { return Long.MAX_VALUE; } public Spliterator>> trySplit() { return null; } } 

通过将其应用于无限流,可以最好地certificate所得到的折叠Stream的惰性:

 Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) .filter(e -> e.getKey()>5) .findFirst().ifPresent(e -> System.out.println(e.getValue())); 

独眼巨人反应 ,我的图书馆贡献,提供可能做你想要的分片和分组function。

  ReactiveSeq> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...) ) .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

groupedStatefullyWhile运算符允许根据批处理的当前状态对元素进行分组。 ReactiveSeq是单线程顺序流。

  Map sharded = new LazyReact() .fromCollection(FileUtils.readLines(...) ) .map(..) .shard(shards, pair -> pair[0]); 

这将创建一个LazyFutureStream(实现java.util.stream.Stream),它将异步并行地处理文件中的数据。 它是懒惰的,在数据​​通过之前不会开始处理。

唯一需要注意的是,您需要事先定义分片。 即上面的’shards’参数是async.Queue的地图,该地图由分片的关键字键入(可能是对[0]是什么?)。

例如

 Map> shards; 

这里有一个关于video和测试代码 的分片示例

它可以通过StreamEx collapse来完成

 final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; StreamEx.of(aa) .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) .forEach(System.out::println); 

我们可以添加peeklimit来validation它是否是惰性计算:

 StreamEx.of(aa) .peek(System.out::println) .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) .limit(1) .forEach(System.out::println);