如何等待ThreadPoolExecutor完成

我的问题:如何在ThreadPoolExecutor上执行一堆线程对象并等待它们全部完成然后再继续?

我是ThreadPoolExecutor的新手。 所以这段代码是一个测试它是如何工作的测试。 现在我甚至没有用对象填充BlockingQueue ,因为我不理解如何在不使用另一个RunnableObject调用execute()情况下启动队列。 无论如何,现在我只是打电话给awaitTermination()但我想我仍然缺少一些东西。 任何提示都会很棒! 谢谢。

 public void testThreadPoolExecutor() throws InterruptedException { int limit = 20; BlockingQueue q = new ArrayBlockingQueue(limit); ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); for (int i = 0; i < limit; i++) { ex.execute(new RunnableObject(i + 1)); } ex.awaitTermination(2, TimeUnit.SECONDS); System.out.println("finished"); } 

RunnableObject类:

 package playground; public class RunnableObject implements Runnable { private final int id; public RunnableObject(int id) { this.id = id; } @Override public void run() { System.out.println("ID: " + id + " started"); try { Thread.sleep(2354); } catch (InterruptedException ignore) { } System.out.println("ID: " + id + " ended"); } } 

你应该循环awaitTermination

 ExecutorService threads; // ... // Tell threads to finish off. threads.shutdown(); // Wait for everything to finish. while (!threads.awaitTermination(10, TimeUnit.SECONDS)) { log.info("Awaiting completion of threads."); } 

您的问题似乎是在将所有作业提交到池后,您没有调用shutdown 。 如果没有shutdown()你的awaitTermination将始终返回false。

 ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); for (int i = 0; i < limit; i++) { ex.execute(new RunnableObject(i + 1)); } // you are missing this line!! ex.shutdown(); ex.awaitTermination(2, TimeUnit.SECONDS); 

您还可以执行以下操作以等待所有作业完成:

 List> futures = new ArrayList>(); for (int i = 0; i < limit; i++) { futures.add(ex.submit(new RunnableObject(i + 1), (Object)null)); } for (Future future : futures) { // this joins with the submitted job future.get(); } ... // still need to shutdown at the end ex.shutdown(); 

此外,因为您正在hibernate2354毫秒,但只等待2 SECONDS的所有作业终止, awaitTermination将始终返回false

最后,听起来你担心创建一个新的ThreadPoolExecutor而你想要重用第一个。 不要。 与您编写的用于检测作业是否完成的任何代码相比,GC开销将极小。


引用javadocs, ThreadPoolExecutor.shutdown()

启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 如果已经关闭,调用没有其他影响。

ThreadPoolExecutor.awaitTermination(...)方法中,它正在等待执行程序的状态转到TERMINATED 。 但首先,如果调用shutdown()则状态必须转到SHUTDOWN如果调用shutdown()则必须转到STOP

这与执行者本身无关。 只需使用接口的java.util.concurrent.ExecutorService.invokeAll(Collection>) 。 它将阻塞,直到所有Callable完成。

执行者意味着长寿; 超出一组任务的生命周期。 shutdown是在应用程序完成和清理时。

以下是在接受InterruptedException时触发重试的已接受答案的变体:

 executor.shutdown(); boolean isWait = true; while (isWait) { try { isWait = !executor.awaitTermination(10, TimeUnit.SECONDS); if (isWait) { log.info("Awaiting completion of bulk callback threads."); } } catch (InterruptedException e) { log.debug("Interruped while awaiting completion of callback threads - trying again..."); } } 

另一种方法是使用CompletionService,如果您必须尝试任何任务结果,这非常有用:

 //run 3 task at time final int numParallelThreads = 3; //I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads); CompletionService completionService = new ExecutorCompletionService(executor); int numTaskToStart = 15; for(int i=0; i (or something you need) MyTask mt = new MyTask(); completionService.submit(mt); } executor.shutdown(); //it cannot be queued more task try { for (int t = 0; t < numTaskToStart ; t++) { Future f = completionService.take(); String result = f.get(); // ... something to do ... } } catch (InterruptedException e) { //termination of all started tasks (it returns all not started tasks in queue) executor.shutdownNow(); } catch (ExecutionException e) { // ... something to catch ... } 

尝试这个,

 ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); for (int i = 0; i < limit; i++) { ex.execute(new RunnableObject(i + 1)); } 

要添加的行

 ex.shutdown(); ex.awaitTermination(timeout, unit)