为什么stream.spliterator()的tryAdvance可能会将项目累积到缓冲区中?

Stream管道获取Spliterator可能会返回StreamSpliterators.WrappingSpliterator的实例。 例如,获取以下Spliterator

 Spliterator source = new Random() .ints(11, 0, 7) // size, origin, bound .filter(nr -> nr % 2 != 0) .mapToObj(Integer::toString) .spliterator(); 

给定上面的Spliterator source ,当我们通过tryAdvance (Consumer consumer)方法单独遍历元素时,在这种情况下,它是StreamSpliterators.WrappingSpliterator的一个实例,它将首先将项目累积到内部缓冲区,在使用这些项之前,我们可以在StreamSpliterators.java#298中看到。 从简单的角度来看, doAdvance()首先将项插入buffer ,然后获取下一个项并将其传递给consumer.accept (…)

 public boolean tryAdvance(Consumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; } 

但是,我不知道需要这个buffer

在这种情况下,为什么tryAdvanceconsumer参数不仅仅用作管道的终端Sink

我大多同意@Holger的回答,但我会以不同的方式加入口音。 我认为你很难理解缓冲区的必要性,因为你有一个非常简单的Stream API允许的心智模型。 如果将Stream视为一系列mapfilter ,则不需要额外的缓冲区,因为这些操作具有两个重要的“好”属性:

  1. 一次处理一个元素
  2. 结果产生0或1个元素

然而,在一般情况下,这些并非如此。 正如@Holger( 我的原始答案中所提到的)提到Java 8中的flatMap已经破坏了规则#2而在Java 9中它们最终添加了实际上在整个Stream上转换的takeWhile – > Stream而不是每个元素基础(这是AFAIK第一个中间衬衫电路操作)。

我不太同意@Holger的另一点是,我认为最根本的原因与他在第二段(即a)中所提出的有点不同,你可以在Stream的末尾多次调用tryAdvance b)“ 没有保证呼叫者将永远通过同一消费者 ”)。 我认为最重要的原因是Spliterator在function上与Stream相同,必须支持短路和懒惰(即不处理整个Stream能力,否则它不能支持未绑定的流)。 换句话说,即使Spliterator API(非常奇怪)要求您必须为给定Spliterator的所有方法的所有调用使用相同的Consumer对象,您仍然需要tryAdvance并且tryAdvance实现仍然必须使用一些缓冲区。 如果你所拥有的只是forEachRemaining(Consumer ) ,你就无法停止处理数据,所以你不能实现类似于findFirsttakeWhile任何东西。 实际上,这是JDK实现内部使用Sink接口而不是Consumer (以及wrapAndCopyInto “wrap”代表)的原因之一: Sink有另外的boolean cancellationRequested()方法。

总结一下 :需要一个缓冲区,因为我们需要Spliterator

  1. 使用简单的Consumer ,它无法报告处理/取消的结束
  2. 提供通过(逻辑)消费者的请求停止处理数据的手段。

请注意,这两个实际上是略微矛盾的要求。

示例和一些代码

在这里,我想提供一些代码示例,我相信在没有额外缓冲区的情况下,如果给出当前API契约(接口),则无法实现这些代码。 此示例基于您的示例。

有一个简单的Collat​​z整数序列被推测总是最终命中1.AFAIK这个猜想还没有被certificate,但是已经validation了很多整数(至少对于整个32位int范围)。

因此,假设我们要解决的问题如下:从1到1,000,000范围内的随机起始编号的Collat​​z序列流中,找到第一个在其十进制表示中包含“123”的数字。

这是一个仅使用Stream (不是Spliterator )的解决方案:

 static String findGoodNumber() { return new Random() .ints(1, 1_000_000) // unbound! .flatMap(nr -> collatzSequence(nr)) .mapToObj(Integer::toString) .filter(s -> s.contains("123")) .findFirst().get(); } 

其中collatzSequence是一个函数,它返回包含Collat​​z序列的Stream ,直到第一个1(并且对于nitpickers,当当前值大于Integer.MAX_VALUE /3时它也会停止,所以我们不会遇到溢出)。

collatzSequence返回的每个这样的Stream都是绑定的。 标准的Random也将最终生成所提供范围内的每个数字。 这意味着我们可以保证流中最终会有一些“好”的数字(例如123 ),而findFirst是短路的,所以整个操作实际上会终止。 但是,没有合理的Stream API实现可以预测这一点。

现在让我们假设您出于某些奇怪的原因想要使用中间Spliterator执行相同的操作。 即使您只有一个逻辑且不需要不同的Consumer ,也不能使用forEachRemaining 。 所以你必须做这样的事情:

 static Spliterator createCollatzRandomSpliterator() { return new Random() .ints(1, 1_000_000) // unbound! .flatMap(nr -> collatzSequence(nr)) .mapToObj(Integer::toString) .spliterator(); } static String findGoodNumberWithSpliterator() { Spliterator source = createCollatzRandomSpliterator(); String[] res = new String[1]; // work around for "final" closure restriction while (source.tryAdvance(s -> { if (s.contains("123")) { res[0] = s; } })) { if (res[0] != null) return res[0]; } throw new IllegalStateException("Impossible"); } 

同样重要的是,对于一些起始数字,Collat​​z序列将包含几个匹配的数字。 例如, 41123123370 (= 41123 * 3 + 1)都包含“123”。 这意味着我们真的不希望我们的Consumer在第一次匹配命中后被调用。 但由于Consumer没有公开任何报告处理结束的方法, WrappingSpliterator不能将我们的Consumer传递给内部Spliterator 。 唯一的解决方案是将内部flatMap所有结果(包括所有后处理)累积到某个缓冲区中,然后一次迭代该缓冲区一个元素。

请记住,这是由public方法Stream.spliterator()返回的Spliterator ,因此不能对调用者进行任何假设(只要它在合同中)。

tryAdvance方法可以为每个流的元素调用一次,再一次检测流的结束,实际上,即使在结束之后它也可能被调用任意次数。 并且没有保证呼叫者将始终通过同一个消费者。

要将消费者直接传递给源分裂器而不进行缓冲,您将必须组成将执行所有流水线阶段的消费者,即调用映射函数并使用其结果或测试谓词,如果否定则不调用下游消费者等等。 传递给源分裂器的消费者也有责任以某种方式通知WrappingSpliteratorfilter拒绝的值,因为源分裂器的tryAdvance方法在这种情况下仍然返回true ,然后必须重复该操作。

正如Eugene正确提到的那样 ,这是一个一刀切的实现,不考虑有多少或哪种管道阶段。 组成这样的消费者的成本可能 tryAdvance ,并且可能必须为每个tryAdvance调用重新应用,为每个流元素读取,例如当不同的消费者传递给tryAdvance或者当相等性检查不起作用时。 请记住,消费者通常被实现为lambda表达式,并且未指定lambda表达式生成的实例的标识或相等性。

因此, tryAdvance实现通过在第一次调用时只组合一个消费者实例来避免这些成本,这些实例将始终将元素存储到同一缓冲区中,如果没有被filter拒绝,也会在第一次调用时分配。 请注意,在正常情况下,缓冲区只能容纳一个元素。 Afaik, flatMap是唯一可以将更多元素推送到缓冲区的操作 。 但请注意, flatMap的这种非惰性行为的存在也是为什么需要这种缓冲策略的原因,至少在涉及flatMap时,以确保由public方法Spliterator实现将履行传递合同在一次调用tryAdvance期间,消费者中最多的一个元素。

相反,当您调用forEachRemaining ,这些问题不存在。 在整个操作过程中只有一个Consumer实例, flatMap的非懒惰也无关紧要,因为所有元素都会被消耗掉。 因此,只要没有先前的tryAdvance调用可能导致某些元素的缓冲,就会尝试非缓冲传输:

  public void forEachRemaining(Consumer consumer) { if (buffer == null && !finished) { Objects.requireNonNull(consumer); init(); ph.wrapAndCopyInto((Sink) consumer::accept, spliterator); finished = true; } else { do { } while (tryAdvance(consumer)); } } 

如您所见,只要buffer尚未初始化,即之前没有进行tryAdvance调用, tryAdvance consumer::accept绑定为Sink并进行完整的直接传输。

Spliterators用于处理遭遇顺序中每个项目的顺序处理,以及按某种顺序并行处理项目。 Spliterator每个方法Spliterator必须能够支持早期绑定和后期绑定。 缓冲旨在将数据收集到合适的可处理块中,这些块遵循排序,并行化和可变性的要求。

换句话说, tryAdvance()不是类中唯一的方法,其他方法必须相互tryAdvance()才能提供外部契约。 要做到这一点,面对可能覆盖部分或全部方法的子类,需要每个方法都遵守其内部契约。

这是我在Holger的几篇文章中读到的内容,我将在这里总结一下; 如果有一定的确切重复(我会尝试找到一个) – 我会关闭并删除我的答案。

首先,这就是为什么首先需要使用WrappingSpliterator – 对于有状态的操作,如sorteddistinct等 – 但我认为你已经理解了这一点。 我也认为flatMap也是如此 – 因为它很渴望。

现在,当你调用spliterator ,IFF没有有状态的操作,显然没有真正的理由将它包装到WrappingSpliterator ,但目前还没有完成。 这可以在将来的版本中进行更改 – 在调用spliterator之前,他们可以检测是否存在有stateful operations ; 但他们现在不这样做,只是将每个操作都视为有状态 ,从而将其WrappingSpliteratorWrappingSpliterator