注册流“完成”钩子
使用Java 8 Stream
API,我想注册一个“完成挂钩”,其行如下:
Stream stream = Stream.of("a", "b", "c"); // additional filters / mappings that I don't control stream.onComplete((Completion c) -> { // This is what I'd like to do: closeResources(); // This might also be useful: Optional exception = c.exception(); exception.ifPresent(e -> throw new ExceptionWrapper(e)); });
我想这样做的原因是因为我想将一个资源包装在Stream
供API客户端使用,我希望该Stream
在消耗后自动清理资源。 如果可能,那么客户可以致电:
Collected collectedInOneGo = Utility.something() .niceLookingSQLDSL() .moreDSLFeatures() .stream() .filter(a -> true) .map(c -> c) .collect(collector);
而不是目前所需要的:
try (Stream meh = Utility.something() .niceLookingSQLDSL() .moreDSLFeatures() .stream()) { Collected collectedWithUglySyntacticDissonance = meh.filter(a -> true) .map(c -> c) .collect(collector); }
理想情况下,我想进入java.util.stream.ReferencePipeline
的各种方法,例如:
@Override final void forEachWithCancel(Spliterator spliterator, Sink
sink) { try { // Existing loop do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); } // These would be nice: catch (Throwable t) { completion.onFailure(t); } finally { completion.onSuccess(); } }
使用现有的JDK 8 API有一种简单的方法吗?
除了基于flatMap
的解决方案(由@Holger提出)之外,拦截终端操作的任何解决方案对于以下代码都是脆弱的:
Stream stream = getAutoCloseableStream(); if(stream.iterator().hasNext()) { // do something if stream is non-empty }
这种用法在规范中是绝对合法的。 不要忘记iterator()
和spliterator()
是终端流操作,但是在执行它们之后仍然需要访问流源。 在任何状态下放弃Iterator
或Spliterator
也是完全有效的,所以你无法知道它是否会被进一步使用。
您可以考虑建议用户不要使用iterator()
和spliterator()
,但这段代码呢?
Stream stream = getAutoCloseableStream(); Stream.concat(stream, Stream.of("xyz")).findFirst();
这内部对第一个流使用spliterator().tryAdvance()
,然后放弃它(如果显式调用结果流close()
,则关闭它)。 您需要让用户不要使用Stream.concat
。 据我所知,你在库中内部经常使用iterator()
/ spliterator()
,所以你需要重新访问所有这些地方以解决可能出现的问题。 当然,还有很多其他库也使用iterator()
/ spliterator()
,之后可能会短路:所有这些库都会与你的function不兼容。
为什么基于flatMap
的解决方案适用于此? 因为在第一次调用hasNext()
或tryAdvance()
它会将整个流内容转储到中间缓冲区并关闭原始流源。 因此,根据流大小,您可能会浪费很多中间内存,甚至会出现OutOfMemoryError
。
您还可以考虑将PhantomReference
保留到Stream
对象并监视ReferenceQueue
。 在这种情况下,完成将由垃圾收集器触发(这也有一些缺点)。
总之,我的建议是继续尝试资源。
最简单的解决方案是将流包装在另一个流中并将其平面映射到自身:
// example stream Stream original=Stream.of("bla").onClose(()->System.out.println("close action")); // this is the trick Stream autoClosed=Stream.of(original).flatMap(Function.identity()); //example op int sum=autoClosed.mapToInt(String::length).sum(); System.out.println(sum);
它的工作原理在于flatMap
操作 :
每个映射的流在其内容放入此流后关闭。
但是当前的实现并不像使用flatMap
时那样懒惰 。 这已在Java 10中修复。
我的建议是在需要关闭返回的流时继续使用try(…)
标准解决方案和文档。 毕竟,在终端操作之后关闭资源的流是不安全的,因为没有保证客户端实际上将调用终端操作。 改变它的想法并放弃流即时是一种有效的用法,而当文档指定它是必需的时,不调用close()
方法则不是。
Java 8已经开创了需要关闭的流如何运作的先例。 在他们的Javadoc中 ,它提到:
Streams有一个BaseStream.close()方法并实现AutoCloseable,但几乎所有的流实例实际上都不需要在使用后关闭。 通常,只有源为IO通道的流(例如Files.lines(Path,Charset)返回的流)才需要关闭。 大多数流都由集合,数组或生成函数支持,不需要特殊的资源管理。 (如果流确实需要关闭,则可以在try-with-resources语句中将其声明为资源。)
所以Java 8的建议是在try-with-resources中打开这些流。 一旦你这样做, Stream
还提供了一种方法来添加一个关闭钩子,几乎就像你所描述的那样: onClose(Runnable)
,它接受一个lambda告诉它该做什么并返回一个也会这样做的Stream
关闭时的操作。
这就是API设计和文档建议你做你想做的事情的方式。
我提出的解决方案看起来像这样:
class AutoClosingStream implements Stream { AutoClosingStream(Stream delegate, Consumer> onComplete) {} // Pipeline ops delegate the op to the real stream and wrap that again @Override public Stream limit(long maxSize) { return new AutoClosingStream(delegate.limit(maxSize), onComplete); } // Terminal ops intercept the result and call the onComplete logic @Override public void forEach(Consumer super T> action) { terminalOp(() -> delegate.forEach(action)); } private void terminalOp(Runnable runnable) { terminalOp(() -> { runnable.run(); return null; }); } private R terminalOp(Supplier supplier) { R result = null; try { result = supplier.get(); onComplete.accept(Optional.empty()); } catch (Throwable e) { onComplete.accept(Optional.of(e)); Utils.sneakyThrow(e); } return result; } }
这只是一个简化的草图来说明这个想法。 真正的解决方案还将支持原始IntStream
, LongStream
和DoubleStream
在开源项目中查看AutoClosingReferenceStream,AutoClosingIntStream,AutoClosingLongStream和AutoClosingDoubleStream的这些完整实现.Speedment https://github.com/speedment/speedment/tree/master/src/main/java/com/speedment/internal/core/流/自动关闭
该解决方案类似于@LukasEder提到的解决方案