注册流“完成”钩子

使用Java 8 Stream API,我想注册一个“完成挂钩”,其行如下:

 Stream stream = Stream.of("a", "b", "c"); // additional filters / mappings that I don't control stream.onComplete((Completion c) -> { // This is what I'd like to do: closeResources(); // This might also be useful: Optional exception = c.exception(); exception.ifPresent(e -> throw new ExceptionWrapper(e)); }); 

我想这样做的原因是因为我想将一个资源包装在Stream供API客户端使用,我希望该Stream在消耗后自动清理资源。 如果可能,那么客户可以致电:

 Collected collectedInOneGo = Utility.something() .niceLookingSQLDSL() .moreDSLFeatures() .stream() .filter(a -> true) .map(c -> c) .collect(collector); 

而不是目前所需要的:

 try (Stream meh = Utility.something() .niceLookingSQLDSL() .moreDSLFeatures() .stream()) { Collected collectedWithUglySyntacticDissonance = meh.filter(a -> true) .map(c -> c) .collect(collector); } 

理想情况下,我想进入java.util.stream.ReferencePipeline的各种方法,例如:

 @Override final void forEachWithCancel(Spliterator

spliterator, Sink

sink) { try { // Existing loop do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); } // These would be nice: catch (Throwable t) { completion.onFailure(t); } finally { completion.onSuccess(); } }

使用现有的JDK 8 API有一种简单的方法吗?

除了基于flatMap的解决方案(由@Holger提出)之外,拦截终端操作的任何解决方案对于以下代码都是脆弱的:

 Stream stream = getAutoCloseableStream(); if(stream.iterator().hasNext()) { // do something if stream is non-empty } 

这种用法在规范中是绝对合法的。 不要忘记iterator()spliterator()是终端流操作,但是在执行它们之后仍然需要访问流源。 在任何状态下放弃IteratorSpliterator也是完全有效的,所以你无法知道它是否会被进一步使用。

您可以考虑建议用户不要使用iterator()spliterator() ,但这段代码呢?

 Stream stream = getAutoCloseableStream(); Stream.concat(stream, Stream.of("xyz")).findFirst(); 

这内部对第一个流使用spliterator().tryAdvance() ,然后放弃它(如果显式调用结果流close() ,则关闭它)。 您需要让用户不要使用Stream.concat 。 据我所知,你在库中内部经常使用iterator() / spliterator() ,所以你需要重新访问所有这些地方以解决可能出现的问题。 当然,还有很多其他库也使用iterator() / spliterator() ,之后可能会短路:所有这些库都会与你的function不兼容。

为什么基于flatMap的解决方案适用于此? 因为在第一次调用hasNext()tryAdvance()它会将整个流内容转储到中间缓冲区并关闭原始流源。 因此,根据流大小,您可能会浪费很多中间内存,甚至会出现OutOfMemoryError

您还可以考虑将PhantomReference保留到Stream对象并监视ReferenceQueue 。 在这种情况下,完成将由垃圾收集器触发(这也有一些缺点)。

总之,我的建议是继续尝试资源。

最简单的解决方案是将流包装在另一个流中并将其平面映射到自身:

 // example stream Stream original=Stream.of("bla").onClose(()->System.out.println("close action")); // this is the trick Stream autoClosed=Stream.of(original).flatMap(Function.identity()); //example op int sum=autoClosed.mapToInt(String::length).sum(); System.out.println(sum); 

它的工作原理在于flatMap操作 :

每个映射的流在其内容放入此流后关闭。

但是当前的实现并不像使用flatMap时那样懒惰 。 这已在Java 10中修复。


我的建议是在需要关闭返回的流时继续使用try(…)标准解决方案和文档。 毕竟,在终端操作之后关闭资源的流是不安全的,因为没有保证客户端实际上将调用终端操作。 改变它的想法并放弃流即时是一种有效的用法,而当文档指定它是必需的时,不调用close()方法则不是。

Java 8已经开创了需要关闭的流如何运作的先例。 在他们的Javadoc中 ,它提到:

Streams有一个BaseStream.close()方法并实现AutoCloseable,但几乎所有的流实例实际上都不需要在使用后关闭。 通常,只有源为IO通道的流(例如Files.lines(Path,Charset)返回的流)才需要关闭。 大多数流都由集合,数组或生成函数支持,不需要特殊的资源管理。 (如果流确实需要关闭,则可以在try-with-resources语句中将其声明为资源。)

所以Java 8的建议是在try-with-resources中打开这些流。 一旦你这样做, Stream 提供了一种方法来添加一个关闭钩子,几乎就像你所描述的那样: onClose(Runnable) ,它接受一个lambda告诉它该做什么并返回一个也会这样做的Stream关闭时的操作。

这就是API设计和文档建议你做你想做的事情的方式。

我提出的解决方案看起来像这样:

 class AutoClosingStream implements Stream { AutoClosingStream(Stream delegate, Consumer> onComplete) {} // Pipeline ops delegate the op to the real stream and wrap that again @Override public Stream limit(long maxSize) { return new AutoClosingStream(delegate.limit(maxSize), onComplete); } // Terminal ops intercept the result and call the onComplete logic @Override public void forEach(Consumer action) { terminalOp(() -> delegate.forEach(action)); } private void terminalOp(Runnable runnable) { terminalOp(() -> { runnable.run(); return null; }); } private  R terminalOp(Supplier supplier) { R result = null; try { result = supplier.get(); onComplete.accept(Optional.empty()); } catch (Throwable e) { onComplete.accept(Optional.of(e)); Utils.sneakyThrow(e); } return result; } } 

这只是一个简化的草图来说明这个想法。 真正的解决方案还将支持原始IntStreamLongStreamDoubleStream

在开源项目中查看AutoClosingReferenceStream,AutoClosingIntStream,AutoClosingLongStream和AutoClosingDoubleStream的这些完整实现.Speedment https://github.com/speedment/speedment/tree/master/src/main/java/com/speedment/internal/core/流/自动关闭

该解决方案类似于@LukasEder提到的解决方案