具有并发输入/输出流的Java进程

我正在尝试创建一种控制台/终端,允许用户输入一个字符串,然后将其转换为一个进程并打印出结果。 就像一个普通的控制台。 但我无法管理输入/输出流。 我已经研究过这个post ,但是这个解决方案很遗憾不适用于我的问题。

与“ipconfig”和“cmd.exe”等标准命令一起,我需要能够运行脚本并使用相同的输入流来传递一些参数,如果脚本要求输入的话。

例如,在运行脚本“python pyScript.py”之后,如果它要求它(例如:raw_input),我应该能够将更多输入传递给脚本,同时还打印脚本的输出。 您期望从终端获得的基本行为。

到目前为止我得到了什么:

import java.awt.BorderLayout; import java.awt.Color; import java.awt.Dimension; import java.awt.event.KeyEvent; import java.awt.event.KeyListener; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextPane; import javax.swing.text.BadLocationException; import javax.swing.text.Document; public class Console extends JFrame{ JTextPane inPane, outPane; InputStream inStream, inErrStream; OutputStream outStream; public Console(){ super("Console"); setPreferredSize(new Dimension(500, 600)); setLocationByPlatform(true); setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // GUI outPane = new JTextPane(); outPane.setEditable(false); outPane.setBackground(new Color(20, 20, 20)); outPane.setForeground(Color.white); inPane = new JTextPane(); inPane.setBackground(new Color(40, 40, 40)); inPane.setForeground(Color.white); inPane.setCaretColor(Color.white); JPanel panel = new JPanel(new BorderLayout()); panel.add(outPane, BorderLayout.CENTER); panel.add(inPane, BorderLayout.SOUTH); JScrollPane scrollPanel = new JScrollPane(panel); getContentPane().add(scrollPanel); // LISTENER inPane.addKeyListener(new KeyListener(){ @Override public void keyPressed(KeyEvent e){ if(e.getKeyCode() == KeyEvent.VK_ENTER){ e.consume(); read(inPane.getText()); } } @Override public void keyTyped(KeyEvent e) {} @Override public void keyReleased(KeyEvent e) {} }); pack(); setVisible(true); } private void read(String command){ println(command); // Write to Process if (outStream != null) { System.out.println("Outstream again"); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream)); try { writer.write(command); //writer.flush(); //writer.close(); } catch (IOException e1) { e1.printStackTrace(); } } // Execute Command try { exec(command); } catch (IOException e) {} inPane.setText(""); } private void exec(String command) throws IOException{ Process pro = Runtime.getRuntime().exec(command, null); inStream = pro.getInputStream(); inErrStream = pro.getErrorStream(); outStream = pro.getOutputStream(); Thread t1 = new Thread(new Runnable() { public void run() { try { String line = null; while(true){ BufferedReader in = new BufferedReader(new InputStreamReader(inStream)); while ((line = in.readLine()) != null) { println(line); } BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream)); while ((line = inErr.readLine()) != null) { println(line); } Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } } }); t1.start(); } public void println(String line) { Document doc = outPane.getDocument(); try { doc.insertString(doc.getLength(), line + "\n", null); } catch (BadLocationException e) {} } public static void main(String[] args){ new Console(); } } 

我没有使用上面提到的ProcessBuilder ,因为我想区分错误和普通流。

更新于2016年8月29日

在@ArcticLord的帮助下,我们已经实现了原始问题中的要求。 现在只需要解决任何奇怪的行为,如非终止过程。 控制台有一个“停止”按钮,只需调用pro.destroy()即可。 但由于某种原因,这不适用于无限运行的进程,即垃圾邮件输出。

控制台: http : //pastebin.com/vyxfPEXC

InputStreamLineBuffer: http : //pastebin.com/TzFamwZ1

停止的示例代码:

 public class Infinity{ public static void main(String[] args){ while(true){ System.out.println("."); } } } 

停止的示例代码:

 import java.util.concurrent.TimeUnit; public class InfinitySlow{ public static void main(String[] args){ while(true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("."); } } } 

您使用代码的方式正确。 你错过了一些小事。
让我们从您的read方法开始:

 private void read(String command){ [...] // Write to Process if (outStream != null) { [...] try { writer.write(command + "\n"); // add newline so your input will get proceed writer.flush(); // flush your input to your process } catch (IOException e1) { e1.printStackTrace(); } } // ELSE!! - if no outputstream is available // Execute Command else { try { exec(command); } catch (IOException e) { // Handle the exception here. Mostly this means // that the command could not get executed // because command was not found. println("Command not found: " + command); } } inPane.setText(""); } 

现在让我们修复你的exec方法。 您应该使用单独的线程来读取正常的过程输出和错误输出。 另外,我介绍了第三个线程,它等待进程结束并关闭outputStream,因此下一个用户输入不是用于进程而是一个新命令。

 private void exec(String command) throws IOException{ Process pro = Runtime.getRuntime().exec(command, null); inStream = pro.getInputStream(); inErrStream = pro.getErrorStream(); outStream = pro.getOutputStream(); // Thread that reads process output Thread outStreamReader = new Thread(new Runnable() { public void run() { try { String line = null; BufferedReader in = new BufferedReader(new InputStreamReader(inStream)); while ((line = in.readLine()) != null) { println(line); } } catch (Exception e) { e.printStackTrace(); } System.out.println("Exit reading process output"); } }); outStreamReader.start(); // Thread that reads process error output Thread errStreamReader = new Thread(new Runnable() { public void run() { try { String line = null; BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream)); while ((line = inErr.readLine()) != null) { println(line); } } catch (Exception e) { e.printStackTrace(); } System.out.println("Exit reading error stream"); } }); errStreamReader.start(); // Thread that waits for process to end Thread exitWaiter = new Thread(new Runnable() { public void run() { try { int retValue = pro.waitFor(); println("Command exit with return value " + retValue); // close outStream outStream.close(); outStream = null; } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }); exitWaiter.start(); } 

现在这应该工作。
如果输入ipconfig它将打印命令输出,关闭输出流并准备好执行新命令。
如果输入cmd它会打印输出,让您输入更多cmd命令,如dircd等,直到您输入exit 。 然后它关闭输出流并准备好接收新命令。

您可能会遇到执行python脚本的问题,因为如果没有刷新到系统管道中,则使用Java读取Process InputStream会出现问题。
请参阅此示例python脚本

 print "Input something!" str = raw_input() print "Received input is : ", str 

您可以使用Java程序运行它并输入输入,但在脚本完成之前您将看不到脚本输出。
我能找到的唯一解决方法是手动刷新脚本中的输出。

 import sys print "Input something!" sys.stdout.flush() str = raw_input() print "Received input is : ", str sys.stdout.flush() 

运行此脚本将如您所愿。
您可以在此处阅读有关此问题的更多信息

  • Java:有没有办法在执行期间运行系统命令并打印输出?
  • 为什么从Process的InputStream块读取altough数据是可用的
  • Java:除非手动刷新,否则无法从Process获取stdout数据

编辑:我刚刚找到另一个非常简单的解决方案,用于Python脚本的stdout.flush()问题。 使用python -u script.py启动它们,您不需要手动刷新。 这应该可以解决您的问题。

EDIT2:我们在评论中讨论过,使用此解决方案输出和错误Stream将会混淆,因为它们在不同的线程中运行。 这里的问题是,当错误流线程出现时,我们无法区分输出写入是否完成。 否则使用锁的经典线程调度可以处理这种情况。 但是,无论数据是否流动,我们都会持续流程直到流程完成。 所以我们需要一个机制来记录自从每个流读取最后一行以来经过了多长时间。

为此,我将介绍一个获取InputStream的类,并启动一个Thread来读取传入的数据。 此线程将每一行存储在队列中,并在流结束时停止。 另外,它保留读取最后一行并添加到Queue的时间。

 public class InputStreamLineBuffer{ private InputStream inputStream; private ConcurrentLinkedQueue lines; private long lastTimeModified; private Thread inputCatcher; private boolean isAlive; public InputStreamLineBuffer(InputStream is){ inputStream = is; lines = new ConcurrentLinkedQueue(); lastTimeModified = System.currentTimeMillis(); isAlive = false; inputCatcher = new Thread(new Runnable(){ @Override public void run() { StringBuilder sb = new StringBuilder(100); int b; try{ while ((b = inputStream.read()) != -1){ // read one char if((char)b == '\n'){ // new Line -> add to queue lines.offer(sb.toString()); sb.setLength(0); // reset StringBuilder lastTimeModified = System.currentTimeMillis(); } else sb.append((char)b); // append char to stringbuilder } } catch (IOException e){ e.printStackTrace(); } finally { isAlive = false; } }}); } // is the input reader thread alive public boolean isAlive(){ return isAlive; } // start the input reader thread public void start(){ isAlive = true; inputCatcher.start(); } // has Queue some lines public boolean hasNext(){ return lines.size() > 0; } // get next line from Queue public String getNext(){ return lines.poll(); } // how much time has elapsed since last line was read public long timeElapsed(){ return (System.currentTimeMillis() - lastTimeModified); } } 

通过这个类,我们可以将输出和错误读取线程合并为一个。 当输入读取缓冲线程存在并且没有数据时,它就会存在。 在每次运行中,它会检查自上次输出读取后是否已经过了一段时间,如果是这样,它会一次打印所有未打印的行。 与错误输出相同。 然后它睡了一些毫不浪费cpu时间。

 private void exec(String command) throws IOException{ Process pro = Runtime.getRuntime().exec(command, null); inStream = pro.getInputStream(); inErrStream = pro.getErrorStream(); outStream = pro.getOutputStream(); InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream); InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream); Thread streamReader = new Thread(new Runnable() { public void run() { // start the input reader buffer threads outBuff.start(); errBuff.start(); // while an input reader buffer thread is alive // or there are unconsumed data left while(outBuff.isAlive() || outBuff.hasNext() || errBuff.isAlive() || errBuff.hasNext()){ // get the normal output if at least 50 millis have passed if(outBuff.timeElapsed() > 50) while(outBuff.hasNext()) println(outBuff.getNext()); // get the error output if at least 50 millis have passed if(errBuff.timeElapsed() > 50) while(errBuff.hasNext()) println(errBuff.getNext()); // sleep a bit bofore next run try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Finish reading error and output stream"); } }); streamReader.start(); // remove outStreamReader and errStreamReader Thread [...] } 

也许这不是一个完美的解决方案,但它应该处理这里的情况。


编辑(31.8.2016)
我们在评论中讨论了在实现使用Process#destroy()杀死已启动进程的停止按钮时代码仍然存在问题。 通过调用destroy()将立即destroy()产生大量输出的进程,例如在无限循环中。 但是由于它已经产生了大量必须由我们的streamReader消耗的输出,我们无法恢复正常的程序行为。
所以我们需要一些小改动:

我们将向InputStreamLineBuffer引入一个destroy()方法,该方法停止输出读取并清除队列。
更改将如下所示:

 public class InputStreamLineBuffer{ private boolean emergencyBrake = false; [...] public InputStreamLineBuffer(InputStream is){ [...] while ((b = inputStream.read()) != -1 && !emergencyBrake){ [...] } } [...] // exits immediately and clears line buffer public void destroy(){ emergencyBrake = true; lines.clear(); } } 

主程序中的一些小变化

 public class ExeConsole extends JFrame{ [...] // The line buffers must be declared outside the method InputStreamLineBuffer outBuff, errBuff; public ExeConsole{ [...] btnStop.addActionListener(new ActionListener() { public void actionPerformed(ActionEvent e) { if(pro != null){ pro.destroy(); outBuff.destroy(); errBuff.destroy(); } }}); } [...] private void exec(String command) throws IOException{ [...] //InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream); //InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream); outBuff = new InputStreamLineBuffer(inStream); errBuff = new InputStreamLineBuffer(inErrStream); [...] } } 

现在它应该能够破坏甚至一些输出垃圾邮件进程。

注意:我发现Process#destroy()无法破坏子进程。 因此,如果你在Windows上启动cmd并从那里启动一个java程序,那么当java程序仍在运行时,你最终会破坏cmd进程。 您将在任务管理器中看到它。 用java本身无法解决这个问题。 它将需要一些依赖于外部工具的操作系统来获取这些进程的pid并手动终止它们。

尽管@ArticLord解决方案很好而且整洁,但最近我遇到了同样的问题,并提出了一个概念上相同的解决方案,但在实现方面略有不同。

概念是相同的,即“批量读取”:当读者线程获得它的轮次时,它会消耗它处理的所有流,并且仅在完成时传递手。
这可以保证输出/错误的打印顺序。

但是,我没有使用基于计时器的转向分配,而是使用基于锁的非阻塞读取模拟:

 // main method for testability: replace with private void exec(String command) public static void main(String[] args) throws Exception { // create a lock that will be shared between reader threads // the lock is fair to minimize starvation possibilities ReentrantLock lock = new ReentrantLock(true); // exec the command: I use nslookup for testing on windows // because it is interactive and prints to stderr too Process p = Runtime.getRuntime().exec("nslookup"); // create a thread to handle output from process (uses a test consumer) Thread outThread = createThread(p.getInputStream(), lock, System.out::print); outThread.setName("outThread"); outThread.start(); // create a thread to handle error from process (test consumer, again) Thread errThread = createThread(p.getErrorStream(), lock, System.err::print); errThread.setName("errThread"); errThread.start(); // create a thread to handle input to process (read from stdin for testing purpose) PrintWriter writer = new PrintWriter(p.getOutputStream()); Thread inThread = createThread(System.in, null, str -> { writer.print(str); writer.flush(); }); inThread.setName("inThread"); inThread.start(); // create a thread to handle termination gracefully. Not really needed in this simple // scenario, but on a real application we don't want to block the UI until process dies Thread endThread = new Thread(() -> { try { // wait until process is done p.waitFor(); logger.debug("process exit"); // signal threads to exit outThread.interrupt(); errThread.interrupt(); inThread.interrupt(); // close process streams p.getOutputStream().close(); p.getInputStream().close(); p.getErrorStream().close(); // wait for threads to exit outThread.join(); errThread.join(); inThread.join(); logger.debug("exit"); } catch(Exception e) { throw new RuntimeException(e.getMessage(), e); } }); endThread.setName("endThread"); endThread.start(); // wait for full termination (process and related threads by cascade joins) endThread.join(); logger.debug("END"); } // convenience method to create a specific reader thread with exclusion by lock behavior private static Thread createThread(InputStream input, ReentrantLock lock, Consumer consumer) { return new Thread(() -> { // wrap input to be buffered (enables ready()) and to read chars // using explicit encoding may be relevant in some case BufferedReader reader = new BufferedReader(new InputStreamReader(input)); // create a char buffer for reading char[] buffer = new char[8192]; try { // repeat until EOF or interruption while(true) { try { // wait for your turn to bulk read if(lock != null && !lock.isHeldByCurrentThread()) { lock.lockInterruptibly(); } // when there's nothing to read, pass the hand (bulk read ended) if(!reader.ready()) { if(lock != null) { lock.unlock(); } // this enables a soft busy-waiting loop, that simultates non-blocking reads Thread.sleep(100); continue; } // perform the read, as we are sure it will not block (input is "ready") int len = reader.read(buffer); if(len == -1) { return; } // transform to string an let consumer consume it String str = new String(buffer, 0, len); consumer.accept(str); } catch(InterruptedException e) { // catch interruptions either when sleeping and waiting for lock // and restore interrupted flag (not necessary in this case, however it's a best practice) Thread.currentThread().interrupt(); return; } catch(IOException e) { throw new RuntimeException(e.getMessage(), e); } } } finally { // protect the lock against unhandled exceptions if(lock != null && lock.isHeldByCurrentThread()) { lock.unlock(); } logger.debug("exit"); } }); } 

请注意,@ ArticLord和我的两种解决方案都不是完全饥饿安全的,机会(真的很少)与消费者的速度成反比。

2016年快乐! ;)