如何使用invokeAll()让所有线程池完成他们的任务?

ExecutorService pool=Executors.newFixedThreadPool(7); List<Future> future=new ArrayList<Future>(); List<Callable> callList = new ArrayList<Callable>(); for(int i=0;i<=diff;i++){ String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE); callList.add(new HotelCheapestFare(str)); } future=pool.invokeAll(callList); for(int i=0;i<=future.size();i++){ System.out.println("name is:"+future.get(i).get().getName()); } 

现在我希望pool在进入for循环之前invokeAll所有的任务但是当我运行这个程序for循环时,在invokeAll之前执行并抛出此exception:

 java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5) Caused by: java.lang.NullPointerException at com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219) at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source) at java.lang.Thread.run 

ExecutorService工作方式是,当您调用invokeAll它等待所有任务完成:

执行给定的任务,返回完成所有状态和结果的Futures列表。 Future.isDone()对于返回列表的每个元素都为true。 请注意,已完成的任务可能正常终止或通过抛出exception终止 。 如果在此操作正在进行时修改了给定集合,则此方法的结果未定义。 1 (强调添加)

这意味着你的任务都已完成,但有些人可能会抛出exception。 此exception是Future一部分 – 调用get会导致在ExecutionException重新包装exception。

从你堆栈跟踪

 java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at ^^^ <-- from get 

你可以看到确实如此。 您的一项任务因NPE而失败。 ExecutorService捕获了exception并通过在调用Future.get时抛出ExecutionException来告诉您它。

现在,如果您想完成任务,则需要ExecutorCompletionService 。 这充当了BlockingQueue ,允许您在完成任务时轮询它们。

 public static void main(String[] args) throws Exception { final ExecutorService executorService = Executors.newFixedThreadPool(10); final ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); executorService.submit(new Runnable() { @Override public void run() { for (int i = 0; i < 100; ++i) { try { final Future myValue = completionService.take(); //do stuff with the Future final String result = myValue.get(); System.out.println(result); } catch (InterruptedException ex) { return; } catch (ExecutionException ex) { System.err.println("TASK FAILED"); } } } }); for (int i = 0; i < 100; ++i) { completionService.submit(new Callable() { @Override public String call() throws Exception { if (Math.random() > 0.5) { throw new RuntimeException("FAILED"); } return "SUCCESS"; } }); } executorService.shutdown(); } 

在这个例子中,我有一个任务调用了ExecutorCompletionService ,当它们变得可用时获取Future ,然后我将任务提交给ExecutorCompletionService

这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败。

唯一的复杂因素是很难告诉轮询线程所有任务都已完成,因为现在所有任务都是异步的。 在这个例子中,我使用了将提交100个任务的知识,因此它只需要轮询100次。 更通用的方法是从submit方法中收集Future ,然后循环遍历它们以查看是否所有内容都已完成。

Future.get()抛出exception。

CancellationException – 如果计算被取消

ExecutionException – 如果计算引发exception

InterruptedException – 如果当前线程在等待时被中断

调用get()方法时捕获所有这些exception。

我已经模拟了一些Callable任务的除零exception,但如果你捕获上面三个例外,如示例代码所示,一个Callable中的exception不会影响提交给ExecutorService其他Callable任务。

示例代码段:

 import java.util.concurrent.*; import java.util.*; public class InvokeAllUsage{ public InvokeAllUsage(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(10); List futureList = new ArrayList(); for ( int i=0; i<10; i++){ MyCallable myCallable = new MyCallable((long)i+1); futureList.add(myCallable); } System.out.println("Start"); try{ List> futures = service.invokeAll(futureList); for(Future future : futures){ try{ System.out.println("future.isDone = " + future.isDone()); System.out.println("future: call ="+future.get()); } catch (CancellationException ce) { ce.printStackTrace(); } catch (ExecutionException ee) { ee.printStackTrace(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } }catch(Exception err){ err.printStackTrace(); } System.out.println("Completed"); service.shutdown(); } public static void main(String args[]){ InvokeAllUsage demo = new InvokeAllUsage(); } class MyCallable implements Callable{ Long id = 0L; public MyCallable(Long val){ this.id = val; } public Long call(){ if ( id % 5 == 0){ id = id / 0; } return id; } } } 

输出:

 creating service Start future.isDone = true future: call =1 future.isDone = true future: call =2 future.isDone = true future: call =3 future.isDone = true future: call =4 future.isDone = true java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at InvokeAllUsage.(InvokeAllUsage.java:20) at InvokeAllUsage.main(InvokeAllUsage.java:37) Caused by: java.lang.ArithmeticException: / by zero at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47) at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) future.isDone = true future: call =6 future.isDone = true future: call =7 future.isDone = true future: call =8 future.isDone = true future: call =9 future.isDone = true java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at InvokeAllUsage.(InvokeAllUsage.java:20) at InvokeAllUsage.main(InvokeAllUsage.java:37) Caused by: java.lang.ArithmeticException: / by zero at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47) at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Completed 

invokeAll是一种阻塞方法。 这意味着 – 在完成所有线程之前,JVM不会进入下一行。 所以我认为你的线程未来结果有问题。

 System.out.println("name is:"+future.get(i).get().getName()); 

从这一行我认为有一些期货没有结果可以为空,所以你应该检查你的代码,如果有一些期货null,如果是这样,在执行此行之前得到一个if。