如何将Java流转换为滑动窗口?

是否建议将流转换为滑动窗口?

例如,在Ruby中你可以使用each_cons :

irb(main):020:0> [1,2,3,4].each_cons(2) { |x| puts x.inspect } [1, 2] [2, 3] [3, 4] => nil irb(main):021:0> [1,2,3,4].each_cons(3) { |x| puts x.inspect } [1, 2, 3] [2, 3, 4] => nil 

在Guava中,我发现只有Iterators#partition ,它是相关的,但没有滑动窗口:

 final Iterator> partition = Iterators.partition(IntStream.range(1, 5).iterator(), 3); partition.forEachRemaining(System.out::println); --> [1, 2, 3] [4] 

API中没有这样的function,因为它支持顺序和并行处理,并且很难为任意流源提供滑动窗口函数的有效并行处理(即使有效的对并行处理也很难,我实现了它,所以我知道)。

但是,如果您的源是具有快速随机访问权限的List ,则可以使用subList()方法获取所需的行为,如下所示:

 public static  Stream> sliding(List list, int size) { if(size > list.size()) return Stream.empty(); return IntStream.range(0, list.size()-size+1) .mapToObj(start -> list.subList(start, start+size)); } 

类似的方法实际上可以在我的StreamEx库中使用:请参阅StreamEx.ofSubLists()

还有一些其他第三方解决方案不关心并行处理,并使用一些内部缓冲区提供滑动function。 例如,protonpack StreamUtils.windowed

如果您愿意使用第三方库而不需要并行性,那么jOOλ提供SQL风格的窗口函数如下

 int n = 2; System.out.println( Seq.of(1, 2, 3, 4) .window(0, n - 1) .filter(w -> w.count() == n) .map(w -> w.window().toList()) .toList() ); 

生产

 [[1, 2], [2, 3], [3, 4]] 

 int n = 3; System.out.println( Seq.of(1, 2, 3, 4) .window(0, n - 1) .filter(w -> w.count() == n) .map(w -> w.window().toList()) .toList() ); 

生产

 [[1, 2, 3], [2, 3, 4]] 

这是一篇关于它是如何工作的博客文章 。

免责声明:我为jOOλ背后的公司工作

另一个选项cyclops -react建立在jOOλ的Seq接口(和JDK 8 Stream)之上,但是simple-react建立了并发/并行性(如果这对您很重要 – 通过创建Streams of Futures)。

你可以使用Lukas强大的窗口函数和任何一个库(因为我们扩展了令人敬畏的jOOλ),但也有一个滑动算子,我认为在这种情况下简化了事情,适合在无限流中使用(即它不消耗流,但在它们流过时缓冲值)。

使用ReactiveSeq,它看起来像这样 –

 ReactiveSeq.of(1, 2, 3, 4) .sliding(2) .forEach(System.out::println); 

使用LazyFutureStream可能看起来像下面的示例 –

  LazyFutureStream.iterate(1,i->i+1) .sliding(3,2) //lists of 3, increment 2 .forEach(System.out::println); 

在cyclops-streams StreamUtils类中还提供了用于在java.util.stream.Stream上创建滑动视图的Equivalant静态方法。

  StreamUtils.sliding(Stream.of(1,2,3,4),2) .map(Pair::new); 

如果您想直接使用每个滑动视图,可以使用返回List Transformer的slidingT运算符。 例如,要为每个滑动视图中的每个元素添加一个数字,然后将每个滑动窗口减少为我们可以执行的元素之和: –

  ReactiveSeq windowsSummed = ReactiveSeq.fromIterable(data) .slidingT(3) .map(a->a+toAdd) .reduce(0,(a,b)->a+b) .stream(); 

免责声明:我为独眼巨人反应后的公司工作

如果你想将Scala的持久集合的全部function带到Java,你可以使用库Javaslang 。

 // this imports List, Stream, Iterator, ... import javaslang.collection.*; Iterator.range(1, 5).sliding(3) .forEach(System.out::println); // ---> // List(1, 2, 3) // List(2, 3, 4) Iterator.range(1, 5).sliding(2, 3) .forEach(System.out::println); // ---> // List(1, 2) // List(4) Iterator.ofAll(javaStream).sliding(3); 

您可能不仅使用Iterator,这也适用于几乎任何其他Javaslang集合:Array,Vector,List,Stream,Queue,HashSet,LinkedHashSet,TreeSet,…

在此处输入图像描述

(概述Javaslang 2.1.0-alpha)

免责声明:我是Javaslang的创造者

我在Tomek的Nurkiewicz博客( https://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html )上找到了解决方案。 您可以使用以下SlidingCollector

 public class SlidingCollector implements Collector>, List>> { private final int size; private final int step; private final int window; private final Queue buffer = new ArrayDeque<>(); private int totalIn = 0; public SlidingCollector(int size, int step) { this.size = size; this.step = step; this.window = max(size, step); } @Override public Supplier>> supplier() { return ArrayList::new; } @Override public BiConsumer>, T> accumulator() { return (lists, t) -> { buffer.offer(t); ++totalIn; if (buffer.size() == window) { dumpCurrent(lists); shiftBy(step); } }; } @Override public Function>, List>> finisher() { return lists -> { if (!buffer.isEmpty()) { final int totalOut = estimateTotalOut(); if (totalOut > lists.size()) { dumpCurrent(lists); } } return lists; }; } private int estimateTotalOut() { return max(0, (totalIn + step - size - 1) / step) + 1; } private void dumpCurrent(List> lists) { final List batch = buffer.stream().limit(size).collect(toList()); lists.add(batch); } private void shiftBy(int by) { for (int i = 0; i < by; i++) { buffer.remove(); } } @Override public BinaryOperator>> combiner() { return (l1, l2) -> { throw new UnsupportedOperationException("Combining not possible"); }; } @Override public Set characteristics() { return EnumSet.noneOf(Characteristics.class); } } 

下面是Tomekin Spock的一些例子(我希望它是可读的):

 import static com.nurkiewicz.CustomCollectors.sliding @Unroll class CustomCollectorsSpec extends Specification { def "Sliding window of #input with size #size and step of 1 is #output"() { expect: input.stream().collect(sliding(size)) == output where: input | size | output [] | 5 | [] [1] | 1 | [[1]] [1, 2] | 1 | [[1], [2]] [1, 2] | 2 | [[1, 2]] [1, 2] | 3 | [[1, 2]] 1..3 | 3 | [[1, 2, 3]] 1..4 | 2 | [[1, 2], [2, 3], [3, 4]] 1..4 | 3 | [[1, 2, 3], [2, 3, 4]] 1..7 | 3 | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]] 1..7 | 6 | [1..6, 2..7] } def "Sliding window of #input with size #size and no overlapping is #output"() { expect: input.stream().collect(sliding(size, size)) == output where: input | size | output [] | 5 | [] 1..3 | 2 | [[1, 2], [3]] 1..4 | 4 | [1..4] 1..4 | 5 | [1..4] 1..7 | 3 | [1..3, 4..6, [7]] 1..6 | 2 | [[1, 2], [3, 4], [5, 6]] } def "Sliding window of #input with size #size and some overlapping is #output"() { expect: input.stream().collect(sliding(size, 2)) == output where: input | size | output [] | 5 | [] 1..4 | 5 | [[1, 2, 3, 4]] 1..7 | 3 | [1..3, 3..5, 5..7] 1..6 | 4 | [1..4, 3..6] 1..9 | 4 | [1..4, 3..6, 5..8, 7..9] 1..10 | 4 | [1..4, 3..6, 5..8, 7..10] 1..11 | 4 | [1..4, 3..6, 5..8, 7..10, 9..11] } def "Sliding window of #input with size #size and gap of #gap is #output"() { expect: input.stream().collect(sliding(size, size + gap)) == output where: input | size | gap | output [] | 5 | 1 | [] 1..9 | 4 | 2 | [1..4, 7..9] 1..10 | 4 | 2 | [1..4, 7..10] 1..11 | 4 | 2 | [1..4, 7..10] 1..12 | 4 | 2 | [1..4, 7..10] 1..13 | 4 | 2 | [1..4, 7..10, [13]] 1..13 | 5 | 1 | [1..5, 7..11, [13]] 1..12 | 5 | 3 | [1..5, 9..12] 1..13 | 5 | 3 | [1..5, 9..13] } def "Sampling #input taking every #nth th element is #output"() { expect: input.stream().collect(sliding(1, nth)) == output where: input | nth | output [] | 1 | [] [] | 5 | [] 1..3 | 5 | [[1]] 1..6 | 2 | [[1], [3], [5]] 1..10 | 5 | [[1], [6]] 1..100 | 30 | [[1], [31], [61], [91]] } } 

另一种选择是实现自定义Spliterator,就像在这里完成一样:

 import java.util.*; public class SlidingWindowSpliterator implements Spliterator> { static  Stream> windowed(Collection stream, int windowSize) { return StreamSupport.stream( new SlidingWindowSpliterator<>(stream, windowSize), false); } private final Queue buffer; private final Iterator sourceIterator; private final int windowSize; private final int size; private SlidingWindowSpliterator(Collection source, int windowSize) { this.buffer = new ArrayDeque<>(windowSize); this.sourceIterator = Objects.requireNonNull(source).iterator(); this.windowSize = windowSize; this.size = calculateSize(source, windowSize); } @Override public boolean tryAdvance(Consumer> action) { if (windowSize < 1) { return false; } while (sourceIterator.hasNext()) { buffer.add(sourceIterator.next()); if (buffer.size() == windowSize) { action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0]))); buffer.poll(); return sourceIterator.hasNext(); } } return false; } @Override public Spliterator> trySplit() { return null; } @Override public long estimateSize() { return size; } @Override public int characteristics() { return ORDERED | NONNULL | SIZED; } private static int calculateSize(Collection source, int windowSize) { return source.size() < windowSize ? 0 : source.size() - windowSize + 1; } }