Java Streams – 如何在每第n个项目中执行中间函数

我正在寻找一个Stream上的操作,使我能够每隔n项执行一次非终端(和/或终端)操作。 虽然我使用素数流,例如,流可以很容易地生成网络请求,用户操作或其他一些冷数据或实时源。

由此:

Duration start = Duration.ofNanos(System.nanoTime()); IntStream.iterate(2, n -> n + 1) .filter(Findprimes::isPrime) .limit(1_000_1000 * 10) .forEach(System.out::println); System.out.println("Duration: " + Duration.ofNanos(System.nanoTime()).minus(start)); 

对于像这样的流函数:

  IntStream.iterate(2, n -> n + 1) .filter(Findprimes::isPrime) .limit(1_000_1000 * 10) .peekEvery(10, System.out::println) .forEach( it -> {}); 

创建一个帮助方法来包装peek()使用者:

 public static IntConsumer every(int count, IntConsumer consumer) { if (count <= 0) throw new IllegalArgumentException("Count must be >1: Got " + count); return new IntConsumer() { private int i; @Override public void accept(int value) { if (++this.i == count) { consumer.accept(value); this.i = 0; } } }; } 

您现在几乎可以按照自己的意愿使用它:

 IntStream.rangeClosed(1, 20) .peek(every(5, System.out::println)) .count(); 

产量

 5 10 15 20 

辅助方法可以放在实用程序类中并静态导入,类似于Collectors类只是静态辅助方法。

正如注释中@ user140547所述,此代码不是线程安全的,因此不能与并行流一起使用。 此外,输出顺序会混乱,所以无论如何将它与并行流一起使用并不是真的有意义。

依赖peek()count()并不是一个好主意,因为如果可以在不经过整个流的情况下计算count() 则根本不会调用该操作 。 即使它现在有效,但这并不意味着它将来也会起作用。 请参阅Java 9中的Stream.count()的javadoc 。

更好地使用forEach()

对于问题本身:在像简单迭代这样的特殊情况下,你可以像过滤一样对象。

 Stream.iterate(2, n->n+1) .limit(20) .filter(n->(n-2)%5==0 && n!=2) .forEach(System.out::println); 

这当然不适用于您可能使用有状态IntConsumer其他情况。 如果使用iterate() ,那么无论如何使用并行流可能没那么有用。

如果你想要一个通用的解决方案,你也可以尝试使用“普通” Stream ,它可能没有IntStream那么高效,但在很多情况下仍然应该足够:

 class Tuple{ // ctor, getter/setter omitted int index; int value; } 

然后你可以这样做:

 Stream.iterate( new Tuple(1,2),t-> new Tuple(t.index+1,t.value*2)) .limit(30) .filter(t->t.index %5 == 0) .forEach(System.out::println); 

如果你必须使用peek() ,你也可以这样做

.peek(t->{if (t.index %5 == 0) System.out.println(t);})

或者如果你添加方法

 static Tuple initialTuple(int value){ return new Tuple(1,value); } static UnaryOperator createNextTuple(IntUnaryOperator f){ return current -> new Tuple(current.index+1,f.applyAsInt(current.value)); } static Consumer every(int n,IntConsumer consumer){ return tuple -> {if (tuple.index % n == 0) consumer.accept(tuple.value);}; } 

你也可以(使用静态导入):

  Stream.iterate( initialTuple(2), createNextTuple(x->x*2)) .limit(30) .peek(every(5,System.out::println)) .forEach(System.out::println); 

尝试这个。

 int[] counter = {0}; long result = IntStream.iterate(2, n -> n + 1) .filter(Findprimes::isPrime) .limit(100) .peek(x -> { if (counter[0]++ % 10 == 0) System.out.print(x + " ");} ) .count(); 

结果:

 2 31 73 127 179 233 283 353 419 467