CLI进程的ThreadPool

我需要通过Java中的stdin将消息传递给CLI PHP进程。 我想在池中运行大约20个PHP进程,这样当我将消息传递给池时,它会将每个消息发送到一个单独的线程,从而保持一个消息队列的传递。 我希望这些PHP进程能够尽可能长时间地保持活着,如果一个人死了就会提出一个新进程。 我看着使用静态线程池执行此操作,但它似乎更适合执行并简单死亡的任务。 我怎么能这样做,用一个简单的接口将消息传递给池? 我是否必须实现自己的自定义“线程池”?

我提供了一些代码,因为我认为它会让事情变得更清晰。 基本上你需要保留一个过程对象池。 请注意,每个进程都有一个输入,输出和错误流,您需要以某种方式进行管理。 在我的示例中,我只是将错误重定向并输出到主进程控制台。 如果需要,您可以设置回调和处理程序以获取PHP程序的输出。 如果您只是在处理任务而不关心PHP所说的内容,请将其保留原样或重定向到文件。

我正在为ObjectPool使用Apache Commons Pool库。 无需重新发明一个。

您将拥有一个运行PHP程序的20个进程池。 仅凭这一点无法满足您的需求。 您可能希望“同时”处理针对所有这20个进程的任务。 因此,您还需要一个可以从ObjectPool中提取Process的ThreadPool。

你还需要明白,如果你杀了,或者CTRL-C你的Java进程, init进程将接管你的php进程,他们就会坐在那里。 您可能希望保留所生成的PHP进程的所有pid的日志,然后在重新运行Java程序时清理它们。

 public class StackOverflow_10037379 { private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory { private String mProcessToRun; public CLIPoolableObjectFactory(String processToRun) { mProcessToRun = processToRun; } @Override public Process makeObject() throws Exception { ProcessBuilder builder = new ProcessBuilder(); builder.redirectError(Redirect.INHERIT); // I am being lazy, but really the InputStream is where // you can get any output of the PHP Process. This setting // will make it output to the current processes console. builder.redirectOutput(Redirect.INHERIT); builder.redirectInput(Redirect.PIPE); builder.command(mProcessToRun); return builder.start(); } @Override public boolean validateObject(Process process) { try { process.exitValue(); return false; } catch (IllegalThreadStateException ex) { return true; } } @Override public void destroyObject(Process process) throws Exception { // If PHP has a way to stop it, do that instead of destroy process.destroy(); } @Override public void passivateObject(Process process) throws Exception { // Should really try to read from the InputStream of the Process // to prevent lock-ups if Rediret.INHERIT is not used. } } public static class CLIWorkItem implements Runnable { private ObjectPool mPool; private String mWork; public CLIWorkItem(ObjectPool pool, String work) { mPool = pool; mWork = work; } @Override public void run() { Process workProcess = null; try { workProcess = mPool.borrowObject(); OutputStream os = workProcess.getOutputStream(); os.write(mWork.getBytes(Charset.forName("UTF-8"))); os.flush(); // Because of the INHERIT rule with the output stream // the console stream overwrites itself. REMOVE THIS in production. Thread.sleep(100); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } finally { if (workProcess != null) { try { // Seriously.. so many exceptions. mPool.returnObject(workProcess); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } } } } } public static void main(String[] args) throws Exception { // Change the 5 to 20 in your case. // Also change mock_php.exe to /usr/bin/php or wherever. ObjectPool pool = new GenericObjectPool<>( new CLIPoolableObjectFactory("mock_php.exe"), 5); // This will only allow you to queue 100 work items at a time. I would suspect // that if you only want 20 PHP processes running at a time and this queue // filled up you'll need to implement some other strategy as you are doing // more work than PHP can keep up with. You'll need to block at some point // or throw work away. BlockingQueue queue = new ArrayBlockingQueue<>(100, true); ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); // print some stuff out. executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); executor.shutdown(); executor.awaitTermination(4000, TimeUnit.HOURS); pool.close(); } } 

程序运行的输出:

 12172 - Message 2 10568 - Message 1 4804 - Message 3 11916 - Message 4 11116 - Message 5 12172 - Message 6 4804 - Message 7 10568 - Message 8 11916 - Message 9 11116 - Message 10 12172 - Message 11 

只输出输入内容的C ++程序代码:

 #include  #include  #include  int main(int argc, char* argv[]) { DWORD pid = GetCurrentProcessId(); std::string line; while (true) { std::getline (std::cin, line); std::cout << pid << " - " << line << std::endl; } return 0; } 

更新

抱歉耽搁了。 这是一个JDK 6版本,适合所有感兴趣的人。 您必须运行一个单独的线程来读取进程的InputStream的所有输入。 我已将此代码设置为在每个新进程旁边生成一个新线程。 只要它还活着,该线程就会始终从进程中读取。 我没有直接输出到文件,而是将其设置为使用Logging框架。 这样,您可以设置日志配置以转到文件,翻转,转到控制台等,而无需硬编码转到文件。

你会注意到我只为每个进程启动一个Gobbler,即使进程有stdout和stderr。 我将stderr重定向到stdout只是为了让事情变得更容易。 显然jdk6只支持这种类型的重定向。

 public class StackOverflow_10037379_jdk6 { private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); // Shamelessy taken from Google and modified. // I don't know who the original Author is. public static class StreamGobbler extends Thread { InputStream is; Logger logger; Level level; StreamGobbler(String logName, Level level, InputStream is) { this.is = is; this.logger = Logger.getLogger(logName); this.level = level; } public void run() { try { InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); String line = null; while ((line = br.readLine()) != null) { logger.log(level, line); } } catch (IOException ex) { logger.log(Level.SEVERE, "Failed to read from Process.", ex); } logger.log( Level.INFO, String.format("Exiting Gobbler for %s.", logger.getName())); } } public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory { private String mProcessToRun; public CLIPoolableObjectFactory(String processToRun) { mProcessToRun = processToRun; } @Override public Process makeObject() throws Exception { ProcessBuilder builder = new ProcessBuilder(); builder.redirectErrorStream(true); builder.command(mProcessToRun); Process process = builder.start(); StreamGobbler loggingGobbler = new StreamGobbler( String.format("process.%s", process.hashCode()), Level.INFO, process.getInputStream()); loggingGobbler.start(); return process; } @Override public boolean validateObject(Process process) { try { process.exitValue(); return false; } catch (IllegalThreadStateException ex) { return true; } } @Override public void destroyObject(Process process) throws Exception { // If PHP has a way to stop it, do that instead of destroy process.destroy(); } @Override public void passivateObject(Process process) throws Exception { // Should really try to read from the InputStream of the Process // to prevent lock-ups if Rediret.INHERIT is not used. } } public static class CLIWorkItem implements Runnable { private ObjectPool mPool; private String mWork; public CLIWorkItem(ObjectPool pool, String work) { mPool = pool; mWork = work; } @Override public void run() { Process workProcess = null; try { workProcess = mPool.borrowObject(); OutputStream os = workProcess.getOutputStream(); os.write(mWork.getBytes(Charset.forName("UTF-8"))); os.flush(); // Because of the INHERIT rule with the output stream // the console stream overwrites itself. REMOVE THIS in production. Thread.sleep(100); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } finally { if (workProcess != null) { try { // Seriously.. so many exceptions. mPool.returnObject(workProcess); } catch (Exception ex) { sLogger.log(Level.SEVERE, null, ex); } } } } } public static void main(String[] args) throws Exception { // Change the 5 to 20 in your case. ObjectPool pool = new GenericObjectPool( new CLIPoolableObjectFactory("mock_php.exe"), 5); BlockingQueue queue = new ArrayBlockingQueue(100, true); ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); // print some stuff out. executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); executor.shutdown(); executor.awaitTermination(4000, TimeUnit.HOURS); pool.close(); } } 

产量

 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 9440 - Message 3 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8776 - Message 2 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 1 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 10096 - Message 4 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8868 - Message 5 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8868 - Message 8 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 10 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 8776 - Message 9 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 10096 - Message 6 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 9440 - Message 7 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: 6100 - Message 11 Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.295131993. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.756434719. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.332711452. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.1981440623. Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run INFO: Exiting Gobbler for process.1043636732. 

这里最好的选择是使用pcntl函数来分叉进程,但进程之间的通信很困难。 我建议创建一个进程可以读取的队列,而不是尝试将消息传递给命令行。

Beanstalk有几个PHP客户端,您可以使用它们来处理进程之间的消息传递。