如何在Java中处理多个流?

我正在尝试运行一个进程,并对其输入,输出和错误流进行处理。 显而易见的方法是使用类似select()东西,但我在Java中唯一可以找到的是Selector.select() ,它接收一个Channel 。 似乎不可能从InputStreamOutputStream获取ChannelFileStream有一个getChannel()方法,但这在这里没有帮助)

所以,我写了一些代码来轮询所有的流:

 while( !out_eof || !err_eof ) { while( out_str.available() ) { if( (bytes = out_str.read(buf)) != -1 ) { // Do something with output stream } else out_eof = true; } while( err_str.available() ) { if( (bytes = err_str.read(buf)) != -1 ) { // Do something with error stream } else err_eof = true; } sleep(100); } 

哪个有效,但它永远不会终止。 当其中一个流到达文件末尾时, available()返回零,因此不调用read()并且我们永远不会得到指示EOF的-1返回。

一种解决方案是检测EOF的非阻塞方式。 我无法在任何地方看到文档中的一个。 或者,有更好的方式做我想做的事情吗?

我在这里看到这个问题: 链接文本虽然它并不完全符合我的要求,但我可能会使用这个想法,为每个流产生单独的线程,以解决我现在遇到的特殊问题。 但当然这不是唯一的方法吗? 当然必须有一种方法可以从多个流中读取而不使用每个流程的线程?

正如您所说, 本答案中概述的解决方案是从流程中读取stdout和stderr的传统方式。 每个线程的流程是可行的方法,即使它有点烦人。

您确实必须为要监视的每个流生成一个Thread的路径。 如果您的用例允许组合相关进程的stdout和stderr,则只需要一个线程,否则需要两个线程。

我花了很长时间才能在我们的一个项目中做到正确,我必须启动一个外部进程,获取其输出并对其执行某些操作,同时查找错误和进程终止并且还能够终止它当java应用程序的用户取消操作时。

我创建了一个相当简单的类来封装观察部分,其run()方法看起来像这样:

 public void run() { BufferedReader tStreamReader = null; try { while (externalCommand == null && !shouldHalt) { logger.warning("ExtProcMonitor(" + (watchStdErr ? "err" : "out") + ") Sleeping until external command is found"); Thread.sleep(500); } if (externalCommand == null) { return; } tStreamReader = new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream() : externalCommand.getInputStream())); String tLine; while ((tLine = tStreamReader.readLine()) != null) { logger.severe(tLine); if (filter != null) { if (filter.matches(tLine)) { informFilterListeners(tLine); return; } } } } catch (IOException e) { logger.logExceptionMessage(e, "IOException stderr"); } catch (InterruptedException e) { logger.logExceptionMessage(e, "InterruptedException waiting for external process"); } finally { if (tStreamReader != null) { try { tStreamReader.close(); } catch (IOException e) { // ignore } } } } 

在呼叫方面,它看起来像这样:

  Thread tExtMonitorThread = new Thread(new Runnable() { public void run() { try { while (externalCommand == null) { getLogger().warning("Monitor: Sleeping until external command is found"); Thread.sleep(500); if (isStopRequested()) { getLogger() .warning("Terminating external process on user request"); if (externalCommand != null) { externalCommand.destroy(); } return; } } int tReturnCode = externalCommand.waitFor(); getLogger().warning("External command exited with code " + tReturnCode); } catch (InterruptedException e) { getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit"); } } }, "ExtCommandWaiter"); ExternalProcessOutputHandlerThread tExtErrThread = new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true); ExternalProcessOutputHandlerThread tExtOutThread = new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true); tExtMonitorThread.start(); tExtOutThread.start(); tExtErrThread.start(); tExtErrThread.setFilter(new FilterFunctor() { public boolean matches(Object o) { String tLine = (String)o; return tLine.indexOf("Error") > -1; } }); FilterListener tListener = new FilterListener() { private boolean abortFlag = false; public boolean shouldAbort() { return abortFlag; } public void matched(String aLine) { abortFlag = abortFlag || (aLine.indexOf("Error") > -1); } }; tExtErrThread.addFilterListener(tListener); externalCommand = new ProcessBuilder(aCommand).start(); tExtErrThread.setProcess(externalCommand); try { tExtMonitorThread.join(); tExtErrThread.join(); tExtOutThread.join(); } catch (InterruptedException e) { // when this happens try to bring the external process down getLogger().severe("Aborted because auf InterruptedException."); getLogger().severe("Killing external command..."); externalCommand.destroy(); getLogger().severe("External command killed."); externalCommand = null; return -42; } int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue(); externalCommand = null; try { getLogger().warning("command exit code: " + tRetVal); } catch (IllegalThreadStateException ex) { getLogger().warning("command exit code: unknown"); } return tRetVal; 

不幸的是,我不需要一个自包含的可运行的例子,但也许这有帮助。 如果我不得不再次这样做,我会再看一下使用Thread.interrupt()方法而不是自制的停止标志(请注意它是挥发性的!),但是我将其留下了一段时间。 🙂