Java 8 Stream,获得头尾

Java 8引入了一个类似于Scala的Stream的Stream类,这是一个function强大的惰性结构,使用它可以非常简洁地执行这样的操作:

def from(n: Int): Stream[Int] = n #:: from(n+1) def sieve(s: Stream[Int]): Stream[Int] = { s.head #:: sieve(s.tail filter (_ % s.head != 0)) } val primes = sieve(from(2)) primes takeWhile(_ < 1000) print // prints all primes less than 1000 

我想知道是否有可能在Java 8中这样做,所以我写了这样的东西:

 IntStream from(int n) { return IntStream.iterate(n, m -> m + 1); } IntStream sieve(IntStream s) { int head = s.findFirst().getAsInt(); return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0))); } IntStream primes = sieve(from(2)); 

相当简单,但它产生java.lang.IllegalStateException: stream has already been operated upon or closed因为findFirst()skip()都是Stream上的终端操作,只能执行一次。

我不需要两次使用流,因为我需要的只是流中的第一个数字,其余的是另一个流,即相当于Scala的Stream.headStream.tail 。 Java 8 Stream中是否有可用于实现此目的的方法?

谢谢。

即使你没有不能拆分IntStream ,你的代码也不起作用,因为你是递归地而不是懒惰地调用你的sieve方法。 因此,在查询结果流以获取第一个值之前,您有一个无限递归。

IntStream s拆分为头部和尾部IntStream (尚未使用)是可能的:

 PrimitiveIterator.OfInt it = s.iterator(); int head = it.nextInt(); IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0); 

在这个地方,你需要一个懒惰地在尾巴上调用sieve的构造。 Stream没有提供; concat期望现有的流实例作为参数,你不能构造一个懒惰地使用lambda表达式调用sieve的流,因为懒惰创建只使用lambda表达式不支持的可变状态。 如果您没有隐藏可变状态的库实现,则必须使用可变对象。 但是一旦你接受了可变状态的要求,解决方案就比你的第一种方法更容易:

 IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0)); IntPredicate p = x -> true; IntStream from(int n) { return IntStream.iterate(n, m -> m + 1); } 

这将以递归方式创建一个filter,但最终无论你是创建一个IntPredicate树还是一个IntStream树(如果它确实有效,就像使用你的IntStream.concat方法一样)并不重要。 如果您不喜欢filter的可变实例字段,则可以将其隐藏在内部类中(但不能在lambda表达式中隐藏…)。

你基本上可以像这样实现它:

 static  Tuple2, Seq> splitAtHead(Stream stream) { Iterator it = stream.iterator(); return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it)); } 

在上面的例子中, Tuple2Seq是从jOOλ借来的类型, jOOλ是我们为jOOQ集成测试开发的库。 如果您不想要任何其他依赖项,您可以自己实现它们:

 class Tuple2 { final T1 v1; final T2 v2; Tuple2(T1 v1, T2 v2) { this.v1 = v1; this.v2 = v2; } static  Tuple2 tuple(T1 v1, T2 v2) { return new Tuple<>(v1, v2); } } static  Tuple2, Stream> splitAtHead(Stream stream) { Iterator it = stream.iterator(); return tuple( it.hasNext() ? Optional.of(it.next()) : Optional.empty, StreamSupport.stream(Spliterators.spliteratorUnknownSize( it, Spliterator.ORDERED ), false) ); } 

除了流的头/尾解构之外,下面的解决方案不进行状态突变。

使用IntStream.iterate获得延迟。 Prime类用于保持发电机状态

  import java.util.PrimitiveIterator; import java.util.stream.IntStream; import java.util.stream.Stream; public class Prime { private final IntStream candidates; private final int current; private Prime(int current, IntStream candidates) { this.current = current; this.candidates = candidates; } private Prime next() { PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator(); int head = it.next(); IntStream tail = IntStream.generate(it::next); return new Prime(head, tail); } public static Stream stream() { IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1); return Stream.iterate(new Prime(2, possiblePrimes), Prime::next) .map(p -> p.current); } } 

用法是这样的:

 Stream first10Primes = Prime.stream().limit(10) 

我的StreamEx库现在有headTail()操作来解决问题:

 public static StreamEx sieve(StreamEx input) { return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head)); } 

headTail方法采用BiFunction ,它将在流终端操作执行期间最多执行一次。 所以这个实现是懒惰的:它在遍历开始之前不计算任何东西,只计算所请求的素数。 BiFunction接收第一个流元素head ,其余元素的tail并且可以以任何方式修改tail 。 您可以将它与预定义输入一起使用:

 sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println); 

但无限的流也是如此

 sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000) .forEach(System.out::println); // Not the primes till 1000, but 1000 first primes sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println); 

还有使用headTail和谓词连接的替代解决方案:

 public static StreamEx sieve(StreamEx input, IntPredicate isPrime) { return input.headTail((head, tail) -> isPrime.test(head) ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head) : sieve(tail, isPrime)); } sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println); 

比较递归解决方案很有意思:它们能够生成多少个素数。

StreamUtils McClean解决方案( StreamUtils

约翰麦克莱恩的解决方案并不是懒惰的:你无法用无限的流来喂它们。 所以我只是通过反复试验找到了最大允许上限( 17793 )(在发生StackOverflowError之后):

 public void sieveTest(){ sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println); } 

Streamable McClean解决方案( Streamable

 public void sieveTest2(){ sieve(Streamable.range(2, 39990)).forEach(System.out::println); } 

将上限39990以上39990导致StackOverflowError。

@frhack解决方案( LazySeq

 LazySeq ints = integers(2); LazySeq primes = sieve(ints); // sieve method from @frhack answer primes.forEach(p -> System.out.println(p)); 

结果:在素数= 53327之后卡住了大量的堆分配和垃圾收集占用了90%以上。 花了几分钟才从53323提升到53327,所以等待更多似乎是不切实际的。

@vidi解决方案

 Prime.stream().forEach(System.out::println); 

结果:素数= 134417后的StackOverflowError。

我的解决方案(StreamEx)

 sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println); 

结果:素数= 236167后的StackOverflowError。

@frhack解决方案( rxjava

 Observable primes = Observable.from(()->primesStream.iterator()); primes.forEach((x) -> System.out.println(x.toString())); 

结果:素数= 367663后的StackOverflowError。

@Holger解决方案

 IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0)); primes.forEach(System.out::println); 

结果:素数= 368089后的StackOverflowError。

我的解决方案(StreamEx与谓词连接)

 sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println); 

结果:素数= 368287后的StackOverflowError。


因此,涉及谓词连接的三个解决方案获胜,因为每个新条件仅增加2个堆栈帧。 我认为,它们之间的差异是微不足道的,不应该被视为定义胜利者。 但是,我更喜欢我的第一个StreamEx解决方案,因为它更类似于Scala代码。

如果您不介意使用第三方库cyclops-streams ,我写的库有很多潜在的解决方案。

StreamUtils类有大量的静态方法,可以直接使用java.util.stream.Streams包括headAndTail

 HeadAndTail headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4)); int head = headAndTail.head(); //1 Stream tail = headAndTail.tail(); //Stream[2,3,4] 

Streamable类表示可重放的Stream并通过构建惰性缓存中间数据结构来工作。 因为它是缓存和偿还 – 头部和尾部可以直接和单独实现。

 Streamable replayable= Streamable.fromStream(Stream.of(1,2,3,4)); int head = repayable.head(); //1 Stream tail = replayable.tail(); //Stream[2,3,4] 

cyclops-stream还提供顺序的Stream扩展,后者又扩展了jOOλ,并且具有基于Tuple (来自jOOλ)和域对象(HeadAndTail)解决方案,用于头部和尾部提取。

 SequenceM.of(1,2,3,4) .splitAtHead(); //Tuple[1,SequenceM[2,3,4] SequenceM.of(1,2,3,4) .headAndTail(); 

每个Tagir的请求更新 – >使用SequenceM的Scala筛的Java版本

 public void sieveTest(){ sieve(SequenceM.range(2, 1_000)).forEach(System.out::println); } SequenceM sieve(SequenceM s){ return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head()) .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0)))) .orElse(SequenceM.of()); } 

另一个版本Streamable

 public void sieveTest2(){ sieve(Streamable.range(2, 1_000)).forEach(System.out::println); } Streamable sieve(Streamable s){ return s.size()==0? Streamable.of() : Streamable.of(s.head()) .appendStreamable(sieve(s.tail() .filter(n -> n % s.head() != 0))); } 

注意 – SequenceM Streamable都没有Empty实现 – 因此Streamable的大小检查和Streamable的使用。

最后一个版本使用普通的java.util.stream.Stream

 import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional; public void sieveTest(){ sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println); } Stream sieve(Stream s){ return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head()) ,sieve(ht.tail().filter(n -> n % ht.head() != 0)))) .orElse(Stream.of()); } 

另一个更新 – 基于@ Holger使用对象而不是基元的版本的懒惰迭代(注意原始版本也是可能的)

  final Mutable> predicate = Mutable.of(x->true); SequenceM.iterate(2, n->n+1) .filter(i->predicate.get().test(i)) .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0))) .limit(100000) .forEach(System.out::println); 

要获得头和尾,您需要一个Lazy Stream实现。 Java 8流或RxJava不适合。

您可以使用例如LazySeq ,如下所示。

使用非常便宜的first / rest分解(head()和tail())从一开始就遍历了延迟序列

LazySeq实现了java.util.List接口,因此可以在各种场所使用。 此外,它还对集合实现了Java 8增强,即流和收集器


 package com.company; import com.nurkiewicz.lazyseq.LazySeq; public class Main { public static void main(String[] args) { LazySeq ints = integers(2); LazySeq primes = sieve(ints); primes.take(10).forEach(p -> System.out.println(p)); } private static LazySeq sieve(LazySeq s) { return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0))); } private static LazySeq integers(int from) { return LazySeq.cons(from, () -> integers(from + 1)); } } 

这是另一个使用Holger建议的方法。 它使用RxJava来增加使用take(int)方法和其他许多方法的可能性。

 package com.company; import rx.Observable; import java.util.function.IntPredicate; import java.util.stream.IntStream; public class Main { public static void main(String[] args) { final IntPredicate[] p={(x)->true}; IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0) ); Observable primes = Observable.from(()->primesStream.iterator()); primes.take(10).forEach((x) -> System.out.println(x.toString())); } } 

如果你想获得一个流的头,只需:

 IntStream.range(1, 5).first(); 

如果你想得到一个流的尾部,只需:

 IntStream.range(1, 5).skip(1); 

如果你想得到一个流的头部和尾部,只需:

 IntStream s = IntStream.range(1, 5); int head = s.head(); IntStream tail = s.tail(); 

如果你想找到素数,只需:

 LongStream.range(2, n) .filter(i -> LongStream.range(2, (long) Math.sqrt(i) + 1).noneMatch(j -> i % j == 0)) .forEach(N::println); 

如果您想了解更多信息,请访问AbacusUtil

声明:我是AbacusUtil的开发人员。