在管道中间关闭流

当我执行此代码时,它会在流管道中打开大量文件:

public static void main(String[] args) throws IOException { Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"), 100, (path, attr) -> path.toString().endsWith(".html")) .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1))) .map(Stream::count) .forEachOrdered(System.out::println); } 

我得到一个例外:

 java.nio.file.FileSystemException: /long/file/name: Too many open files 

问题是当Stream.count完成后, Stream.count不会关闭流。 但我不明白为什么它不应该,因为它是一个终端操作。 对于其他终端操作(例如reduceforEach 。 另一方面, flatMap关闭它所包含的流。

该文档告诉我使用try-with-resouces语句在必要时关闭流。 在我的情况下,我可以用这样的东西替换count行:

 .map(s -> { long c = s.count(); s.close(); return c; } ) 

但这是嘈杂和丑陋的,在某些情况下可能会给大型复杂的管道带来真正的不便。

所以我的问题如下:

  1. 为什么流设计不是为了让终端操作关闭他们正在处理的流? 这将使它们在IO流中更好地工作。
  2. 关闭流水线中IO流的最佳解决方案是什么?

runtimizeException是一个在RuntimeException中包装已检查exception的方法。

这里有两个问题:处理已检查的exception(如IOException )以及及时关闭资源。

没有任何预定义的function接口声明任何已检查的exception,这意味着它们必须在lambda中处理,或者包含在未经检查的exception中并重新抛出。 看起来你的runtimizeException函数就是这样做的。 您可能还必须为它声明自己的function接口。 你可能已经发现,这是一种痛苦。

在关闭像文件这样的资源时,有一些调查是在到达流末尾时自动关闭流。 这很方便,但是它不会在抛出exception时处理关闭。 在流中没有神奇的做正确的机制。

我们留下了处理资源闭包的标准Java技术,即Java 7中引入的try-with-resources构造.TWR确实希望在打开时将资源在调用堆栈中的同一级别关闭。 “打开它的人必须关闭它”的原则适用。 TWR还处理exception处理,这通常可以方便地在同一个地方处理exception处理和资源关闭。

在此示例中,流有点不寻常,因为它将Stream映射到Stream> 。 这些嵌套流是未关闭的流,当系统用完打开的文件描述符时会导致最终exception。 使这很困难的是文件通过一个流操作打开然后传递到下游; 这使得无法使用TWR。

构建此管道的另一种方法如下。

Files.lines调用是打开文件的调用,因此必须是TWR语句中的资源。 这个文件的处理是抛出(某些) IOExceptions地方,所以我们可以在同一个TWR语句中进行exception包装。 这表明有一个简单的函数将路径映射到行数,同时处理资源关闭和exception包装:

 long lineCount(Path path) { try (Stream s = Files.lines(path, StandardCharsets.ISO_8859_1)) { return s.count(); } catch (IOException ioe) { throw new UncheckedIOException(ioe); } } 

一旦你有了这个辅助函数,主管道就像这样:

 Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"), 100, (path, attr) -> path.toString().endsWith(".html")) .mapToLong(this::lineCount) .forEachOrdered(System.out::println); 

可以创建一个可靠地关闭管道中间的流的实用程序方法。

这可以确保使用try-with-resource-statement关闭每个资源,但是不需要自定义实用程序方法,并且比直接在lambda中编写try-statement要简单得多。

使用此方法,问题中的管道如下所示:

 Files.find(Paths.get("Java_8_API_docs/docs/api"), 100, (path, attr) -> path.toString().endsWith(".html")) .map(file -> applyAndClose( () -> Files.lines(file, StandardCharsets.ISO_8859_1), Stream::count)) .forEachOrdered(System.out::println); 

实现看起来像这样:

 /** * Applies a function to a resource and closes it afterwards. * @param sup Supplier of the resource that should be closed * @param op operation that should be performed on the resource before it is closed * @return The result of calling op.apply on the resource */ private static  B applyAndClose(Callable sup, Function op) { try (A res = sup.call()) { return op.apply(res); } catch (RuntimeException exc) { throw exc; } catch (Exception exc) { throw new RuntimeException("Wrapped in applyAndClose", exc); } } 

(由于需要关闭的资源在分配时经常会抛出exception,因此非运行时exception包含在运行时exception中,从而避免需要单独的方法来执行此操作。)

您需要在此流操作中调用close() ,这将导致调用所有底层关闭处理程序。

更好的是,将整个语句包装在try-with-resources块中,然后它将自动调用close处理程序。

在您的情况下这可能不太可能,这意味着您需要在某些操作中自己处理它。 您当前的方法可能根本不适合流。

看来你确实需要在你的第二个map()操作中做到这一点。

应该只调用一次AutoCloseable接口的关闭 。 有关更多信息,请参阅AutoCloseable的文档。

如果最终操作将自动关闭流,则可以调用close两次。 看一下下面的例子:

 try (Stream lines = Files.lines(path)) { lines.count(); } 

正如现在定义的那样,行上的close方法将只调用一次。 无论最终操作是否正常完成,或者操作是否在IOException中中止。 如果在最终操作中隐式地关闭流,则如果发生IOException ,则调用close方法一次,如果操作成功完成,则调用两次

这是一个替代方法,它使用Files另一个方法,并将避免泄漏文件描述符:

 Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"), 100, (path, attr) -> path.toString().endsWith(".html")) .map(file -> runtimizeException(() -> Files.readAllLines(file, StandardCharsets.ISO_8859_1).size()) .forEachOrdered(System.out::println); 

与您的版本不同,它将为行计数返回一个int而不是long ; 但你没有那么多行的文件,对吗?