从一个长流创建流的流

我想根据Streams的内容将单个Stream拆分为Streams Streams 。 生成的Stream应包含部分原始流的数据。

我的实际应用程序更复杂(它将日志行分组在时间间隔列表中),但我的问题是如何处理流,所以在这里我要问一个简化的例子。

示例问题

我希望能够根据重复的相同数字将Stream拆分为Stream<Stream> ,只留下奇数的流。

例如,以下流包含:

{1,1,1,2,2,2,3,6,7,7,1,1}

需要产生包含以下内容的流的流:

{{1,1,1},{3},{7,7},{1,1}}

通过使用filter开始(或结束),我可以做出偶数数字:

 Stream input = ...; Straem<Stream> output = input.filter(this::isOdd).someOtherOperation(); 

这是不希望的,因为它意味着两次评估每个输入值,这是可以接受的,但我宁愿避免这种情况。

解决方案的想法

我当前的解决方案是迭代流的内容并创建List<List>并将其转换为Stream<Stream> 。 但是这意味着完整的结果保存在内存中(这对我的应用程序来说是不受欢迎的)。

我也认为我可以通过编写从流中读取的自己的Iterator这一点,但我不确定这是如何工作的。

如何根据原始Stream的内容将Stream转换为Stream of Stream ,而不是首先将完整结果存储为List of Lists

您可能希望实现自己的聚合分裂器来执行此操作。 质子包库中已有类似的东西(第一个链接重定向到在proton-pack中实现的链接)。

请注意,您将获得Stream> (您可以尝试直接修改实现以获得Stream> ,但是您总是需要缓冲少量元素;具体取决于窗口的大小;以及测试你是否应该创建一个新窗口)。 例如:

 StreamUtils.aggregate(Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1), Objects::equals) .forEach(System.out::println); 

输出:

 [1, 1, 1] [2, 2, 2] [3] [6] [7, 7] [1, 1] 

您可以使用我的StreamEx库。 它有groupRuns来完成这项工作:

 List input = Arrays.asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1); Stream> streams = StreamEx.of(input).filter(this::isOdd) .groupRuns(Integer::equals) .map(List::stream); 

用法示例:

 streams.map(s -> StreamEx.of(s).joining(",")).forEach(System.out::println); 

输出:

 1,1,1 3 7,7 1,1 

与protonpack库类似,里面有一个自定义分裂器,但是使用StreamEx可以利用并行处理(protonpack根本不分裂)。

在顺序处理中,一次最多只有一个中间列表驻留在内存中(其他中间列表适用于GC)。 如果您仍然担心内存消耗(例如,您有很长的组),则可以使用另一种方法来解决此任务,因为StreamEx 0.3.3:

 Stream> streams = StreamEx.of(input).filter(this::isOdd) .runLengths() .mapKeyValue(StreamEx::constant); 

runLengths方法返回条目流,其中key是元素,value是相邻重复元素的数量。 之后使用StreamEx.constant ,它是Stream.generate(() -> value).limit(length)快捷方式。 因此,即使对于很长的组,您也会有一个恒定的中间内存消耗。 当然这个版本也是并行友好的。

更新: StreamEx 0.3.3已发布,因此第二个解决方案现在也符合条件。

我担心这是不可行的,至少不是很好的方式。 即使您将元素映射到流中并减少它们,这些内部流也必须知道它们包含哪些元素,因此它们必须存储一些内容。

最简单的解决方案是使用groupingBy但是它会将所有结果存储在地图中:

 List input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1); Map> grouped = input.stream().collect(groupingBy(i -> i)); Stream> streamOfStreams = grouped.values().stream().map(list -> list.stream()); 

您可以尝试使用reduce操作,但它需要您实现自己的Stream of Streams,您必须在其中存储每个流包含的元素。 更不用说实施它需要付出很多努力。

在您的情况下,我能想到的最佳解决方案是迭代列表两次:

 public static void main(String[] args) { List input = asList(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1); input.stream().distinct().filter(i -> isOdd(i)).forEach(i -> { List subList = input.stream().filter(j -> Objects.equals(j, i)).collect(toList()); System.out.println(subList); // do something with the stream instead of collecting to list }); } private static boolean isOdd(Integer i) { return (i & 1) == 1; } 

但请注意,它具有O(n^2)时间复杂度。

编辑:

此解决方案仅包含本地元素组。 它仅存储当前本地组。

 public static void main(String[] args) { Stream input = Stream.of(1, 1, 1, 2, 2, 2, 3, 6, 7, 7, 1, 1); Iterator iterator = input.iterator(); int first; int second = iterator.next(); List buffer = new ArrayList<>(); buffer.add(second); do { first = second; second = iterator.next(); if (Objects.equals(first, second)) { buffer.add(second); } else { doSomethingWithTheGroup(buffer); buffer = new ArrayList<>(); // let GC remove the previous buffer buffer.add(second); } } while (iterator.hasNext()); doSomethingWithTheGroup(buffer); } private static void doSomethingWithTheGroup(List buffer) { System.out.println(buffer); } private static boolean isOdd(Integer i) { return (i & 1) == 1; } 

输出:

 [1, 1, 1] [2, 2, 2] [3] [6] [7, 7] [1, 1] 

和@Jaroslaw一样,我也使用Map来保存不同的Streams。 但是,地图将保留从输入构建的Streams并且不会预先收集。 使用Stream.concatStream.of可以向流中添加一个元素:

  Map> streamMap = new HashMap<>(); int[] arr = {1,1,1,2,2,2,3,6,7,7,1,1}; Arrays.stream(arr) .filter(this::isOdd) .forEach(i -> { Stream st = streamMap.get(i); if (st == null) st = Stream.of(i); else st = Stream.concat(st, Stream.of(i)); streamMap.put(i, st); }); streamMap.entrySet().stream().forEach(e -> { System.out.print(e.getKey() + "={"); e.getValue().forEach(System.out::print); System.out.println("}"); }); 

输出:

 1={11111} 3={3} 7={77}