为什么stream.spliterator()的tryAdvance可能会将项目累积到缓冲区中?
从Stream
管道获取Spliterator
可能会返回StreamSpliterators.WrappingSpliterator的实例。 例如,获取以下Spliterator
:
Spliterator source = new Random() .ints(11, 0, 7) // size, origin, bound .filter(nr -> nr % 2 != 0) .mapToObj(Integer::toString) .spliterator();
给定上面的Spliterator source
,当我们通过tryAdvance (Consumer consumer)
方法单独遍历元素时,在这种情况下,它是StreamSpliterators.WrappingSpliterator的一个实例,它将首先将项目累积到内部缓冲区,在使用这些项之前,我们可以在StreamSpliterators.java#298中看到。 从简单的角度来看, doAdvance()
首先将项插入buffer
,然后获取下一个项并将其传递给consumer.accept (…)
。
public boolean tryAdvance(Consumer consumer) { boolean hasNext = doAdvance(); if (hasNext) consumer.accept(buffer.get(nextToConsume)); return hasNext; }
但是,我不知道需要这个buffer
。
在这种情况下,为什么tryAdvance
的consumer
参数不仅仅用作管道的终端Sink
?
我大多同意@Holger的回答,但我会以不同的方式加入口音。 我认为你很难理解缓冲区的必要性,因为你有一个非常简单的Stream API允许的心智模型。 如果将Stream视为一系列map
和filter
,则不需要额外的缓冲区,因为这些操作具有两个重要的“好”属性:
- 一次处理一个元素
- 结果产生0或1个元素
然而,在一般情况下,这些并非如此。 正如@Holger( 我的原始答案中所提到的)提到Java 8中的flatMap
已经破坏了规则#2而在Java 9中它们最终添加了实际上在整个Stream
上转换的takeWhile – > Stream
而不是每个元素基础(这是AFAIK第一个中间衬衫电路操作)。
我不太同意@Holger的另一点是,我认为最根本的原因与他在第二段(即a)中所提出的有点不同,你可以在Stream
的末尾多次调用tryAdvance
b)“ 没有保证呼叫者将永远通过同一消费者 ”)。 我认为最重要的原因是Spliterator
在function上与Stream
相同,必须支持短路和懒惰(即不处理整个Stream
能力,否则它不能支持未绑定的流)。 换句话说,即使Spliterator API(非常奇怪)要求您必须为给定Spliterator
的所有方法的所有调用使用相同的Consumer
对象,您仍然需要tryAdvance
并且tryAdvance
实现仍然必须使用一些缓冲区。 如果你所拥有的只是forEachRemaining(Consumer super T> )
,你就无法停止处理数据,所以你不能实现类似于findFirst
或takeWhile
任何东西。 实际上,这是JDK实现内部使用Sink
接口而不是Consumer
(以及wrapAndCopyInto
“wrap”代表)的原因之一: Sink
有另外的boolean cancellationRequested()
方法。
总结一下 :需要一个缓冲区,因为我们需要Spliterator
:
- 使用简单的
Consumer
,它无法报告处理/取消的结束 - 提供通过(逻辑)消费者的请求停止处理数据的手段。
请注意,这两个实际上是略微矛盾的要求。
示例和一些代码
在这里,我想提供一些代码示例,我相信在没有额外缓冲区的情况下,如果给出当前API契约(接口),则无法实现这些代码。 此示例基于您的示例。
有一个简单的Collatz整数序列被推测总是最终命中1.AFAIK这个猜想还没有被certificate,但是已经validation了很多整数(至少对于整个32位int范围)。
因此,假设我们要解决的问题如下:从1到1,000,000范围内的随机起始编号的Collatz序列流中,找到第一个在其十进制表示中包含“123”的数字。
这是一个仅使用Stream
(不是Spliterator
)的解决方案:
static String findGoodNumber() { return new Random() .ints(1, 1_000_000) // unbound! .flatMap(nr -> collatzSequence(nr)) .mapToObj(Integer::toString) .filter(s -> s.contains("123")) .findFirst().get(); }
其中collatzSequence
是一个函数,它返回包含Collatz序列的Stream
,直到第一个1(并且对于nitpickers,当当前值大于Integer.MAX_VALUE /3
时它也会停止,所以我们不会遇到溢出)。
collatzSequence
返回的每个这样的Stream
都是绑定的。 标准的Random
也将最终生成所提供范围内的每个数字。 这意味着我们可以保证流中最终会有一些“好”的数字(例如123
),而findFirst
是短路的,所以整个操作实际上会终止。 但是,没有合理的Stream API实现可以预测这一点。
现在让我们假设您出于某些奇怪的原因想要使用中间Spliterator
执行相同的操作。 即使您只有一个逻辑且不需要不同的Consumer
,也不能使用forEachRemaining
。 所以你必须做这样的事情:
static Spliterator createCollatzRandomSpliterator() { return new Random() .ints(1, 1_000_000) // unbound! .flatMap(nr -> collatzSequence(nr)) .mapToObj(Integer::toString) .spliterator(); } static String findGoodNumberWithSpliterator() { Spliterator source = createCollatzRandomSpliterator(); String[] res = new String[1]; // work around for "final" closure restriction while (source.tryAdvance(s -> { if (s.contains("123")) { res[0] = s; } })) { if (res[0] != null) return res[0]; } throw new IllegalStateException("Impossible"); }
同样重要的是,对于一些起始数字,Collatz序列将包含几个匹配的数字。 例如, 41123
和123370
(= 41123 * 3 + 1)都包含“123”。 这意味着我们真的不希望我们的Consumer
在第一次匹配命中后被调用。 但由于Consumer
没有公开任何报告处理结束的方法, WrappingSpliterator
不能将我们的Consumer
传递给内部Spliterator
。 唯一的解决方案是将内部flatMap
所有结果(包括所有后处理)累积到某个缓冲区中,然后一次迭代该缓冲区一个元素。
请记住,这是由public
方法Stream.spliterator()
返回的Spliterator
,因此不能对调用者进行任何假设(只要它在合同中)。
tryAdvance
方法可以为每个流的元素调用一次,再一次检测流的结束,实际上,即使在结束之后它也可能被调用任意次数。 并且没有保证呼叫者将始终通过同一个消费者。
要将消费者直接传递给源分裂器而不进行缓冲,您将必须组成将执行所有流水线阶段的消费者,即调用映射函数并使用其结果或测试谓词,如果否定则不调用下游消费者等等。 传递给源分裂器的消费者也有责任以某种方式通知WrappingSpliterator
filter拒绝的值,因为源分裂器的tryAdvance
方法在这种情况下仍然返回true
,然后必须重复该操作。
正如Eugene正确提到的那样 ,这是一个一刀切的实现,不考虑有多少或哪种管道阶段。 组成这样的消费者的成本可能 tryAdvance
,并且可能必须为每个tryAdvance
调用重新应用,为每个流元素读取,例如当不同的消费者传递给tryAdvance
或者当相等性检查不起作用时。 请记住,消费者通常被实现为lambda表达式,并且未指定lambda表达式生成的实例的标识或相等性。
因此, tryAdvance
实现通过在第一次调用时只组合一个消费者实例来避免这些成本,这些实例将始终将元素存储到同一缓冲区中,如果没有被filter拒绝,也会在第一次调用时分配。 请注意,在正常情况下,缓冲区只能容纳一个元素。 Afaik, flatMap
是唯一可以将更多元素推送到缓冲区的操作 。 但请注意, flatMap
的这种非惰性行为的存在也是为什么需要这种缓冲策略的原因,至少在涉及flatMap
时,以确保由public
方法Spliterator
实现将履行传递合同在一次调用tryAdvance
期间,消费者中最多的一个元素。
相反,当您调用forEachRemaining
,这些问题不存在。 在整个操作过程中只有一个Consumer
实例, flatMap
的非懒惰也无关紧要,因为所有元素都会被消耗掉。 因此,只要没有先前的tryAdvance
调用可能导致某些元素的缓冲,就会尝试非缓冲传输:
public void forEachRemaining(Consumer super P_OUT> consumer) { if (buffer == null && !finished) { Objects.requireNonNull(consumer); init(); ph.wrapAndCopyInto((Sink) consumer::accept, spliterator); finished = true; } else { do { } while (tryAdvance(consumer)); } }
如您所见,只要buffer
尚未初始化,即之前没有进行tryAdvance
调用, tryAdvance
consumer::accept
绑定为Sink
并进行完整的直接传输。
Spliterators
用于处理遭遇顺序中每个项目的顺序处理,以及按某种顺序并行处理项目。 Spliterator
每个方法Spliterator
必须能够支持早期绑定和后期绑定。 缓冲旨在将数据收集到合适的可处理块中,这些块遵循排序,并行化和可变性的要求。
换句话说, tryAdvance()
不是类中唯一的方法,其他方法必须相互tryAdvance()
才能提供外部契约。 要做到这一点,面对可能覆盖部分或全部方法的子类,需要每个方法都遵守其内部契约。
这是我在Holger的几篇文章中读到的内容,我将在这里总结一下; 如果有一定的确切重复(我会尝试找到一个) – 我会关闭并删除我的答案。
首先,这就是为什么首先需要使用WrappingSpliterator
– 对于有状态的操作,如sorted
, distinct
等 – 但我认为你已经理解了这一点。 我也认为flatMap
也是如此 – 因为它很渴望。
现在,当你调用spliterator
,IFF没有有状态的操作,显然没有真正的理由将它包装到WrappingSpliterator
,但目前还没有完成。 这可以在将来的版本中进行更改 – 在调用spliterator
之前,他们可以检测是否存在有stateful operations
; 但他们现在不这样做,只是将每个操作都视为有状态 ,从而将其WrappingSpliterator
到WrappingSpliterator