如何使用invokeAll()让所有线程池完成他们的任务?
ExecutorService pool=Executors.newFixedThreadPool(7); List<Future> future=new ArrayList<Future>(); List<Callable> callList = new ArrayList<Callable>(); for(int i=0;i<=diff;i++){ String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE); callList.add(new HotelCheapestFare(str)); } future=pool.invokeAll(callList); for(int i=0;i<=future.size();i++){ System.out.println("name is:"+future.get(i).get().getName()); }
现在我希望pool在进入for循环之前invokeAll
所有的任务但是当我运行这个程序for循环时,在invokeAll
之前执行并抛出此exception:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:65) Caused by: java.lang.NullPointerException at com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166) at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:219) at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run
ExecutorService
工作方式是,当您调用invokeAll
它等待所有任务完成:
执行给定的任务,返回完成所有状态和结果的Futures列表。 Future.isDone()对于返回列表的每个元素都为true。 请注意,已完成的任务可能正常终止或通过抛出exception终止 。 如果在此操作正在进行时修改了给定集合,则此方法的结果未定义。 1 (强调添加)
这意味着你的任务都已完成,但有些人可能会抛出exception。 此exception是Future
一部分 – 调用get
会导致在ExecutionException
重新包装exception。
从你堆栈跟踪
java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at java.util.concurrent.FutureTask.get(Unknown Source) at ^^^ <-- from get
你可以看到确实如此。 您的一项任务因NPE而失败。 ExecutorService
捕获了exception并通过在调用Future.get
时抛出ExecutionException
来告诉您它。
现在,如果您想完成任务,则需要ExecutorCompletionService
。 这充当了BlockingQueue
,允许您在完成任务时轮询它们。
public static void main(String[] args) throws Exception { final ExecutorService executorService = Executors.newFixedThreadPool(10); final ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); executorService.submit(new Runnable() { @Override public void run() { for (int i = 0; i < 100; ++i) { try { final Future myValue = completionService.take(); //do stuff with the Future final String result = myValue.get(); System.out.println(result); } catch (InterruptedException ex) { return; } catch (ExecutionException ex) { System.err.println("TASK FAILED"); } } } }); for (int i = 0; i < 100; ++i) { completionService.submit(new Callable () { @Override public String call() throws Exception { if (Math.random() > 0.5) { throw new RuntimeException("FAILED"); } return "SUCCESS"; } }); } executorService.shutdown(); }
在这个例子中,我有一个任务调用了ExecutorCompletionService
,当它们变得可用时获取Future
,然后我将任务提交给ExecutorCompletionService
。
这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败。
唯一的复杂因素是很难告诉轮询线程所有任务都已完成,因为现在所有任务都是异步的。 在这个例子中,我使用了将提交100个任务的知识,因此它只需要轮询100次。 更通用的方法是从submit
方法中收集Future
,然后循环遍历它们以查看是否所有内容都已完成。
Future.get()抛出exception。
CancellationException
– 如果计算被取消
ExecutionException
– 如果计算引发exception
InterruptedException
– 如果当前线程在等待时被中断
调用get()
方法时捕获所有这些exception。
我已经模拟了一些Callable
任务的除零exception,但如果你捕获上面三个例外,如示例代码所示,一个Callable
中的exception不会影响提交给ExecutorService
其他Callable
任务。
示例代码段:
import java.util.concurrent.*; import java.util.*; public class InvokeAllUsage{ public InvokeAllUsage(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(10); List futureList = new ArrayList (); for ( int i=0; i<10; i++){ MyCallable myCallable = new MyCallable((long)i+1); futureList.add(myCallable); } System.out.println("Start"); try{ List> futures = service.invokeAll(futureList); for(Future future : futures){ try{ System.out.println("future.isDone = " + future.isDone()); System.out.println("future: call ="+future.get()); } catch (CancellationException ce) { ce.printStackTrace(); } catch (ExecutionException ee) { ee.printStackTrace(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } }catch(Exception err){ err.printStackTrace(); } System.out.println("Completed"); service.shutdown(); } public static void main(String args[]){ InvokeAllUsage demo = new InvokeAllUsage(); } class MyCallable implements Callable { Long id = 0L; public MyCallable(Long val){ this.id = val; } public Long call(){ if ( id % 5 == 0){ id = id / 0; } return id; } } }
输出:
creating service Start future.isDone = true future: call =1 future.isDone = true future: call =2 future.isDone = true future: call =3 future.isDone = true future: call =4 future.isDone = true java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at InvokeAllUsage.(InvokeAllUsage.java:20) at InvokeAllUsage.main(InvokeAllUsage.java:37) Caused by: java.lang.ArithmeticException: / by zero at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47) at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) future.isDone = true future: call =6 future.isDone = true future: call =7 future.isDone = true future: call =8 future.isDone = true future: call =9 future.isDone = true java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at InvokeAllUsage. (InvokeAllUsage.java:20) at InvokeAllUsage.main(InvokeAllUsage.java:37) Caused by: java.lang.ArithmeticException: / by zero at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47) at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Completed
invokeAll是一种阻塞方法。 这意味着 – 在完成所有线程之前,JVM不会进入下一行。 所以我认为你的线程未来结果有问题。
System.out.println("name is:"+future.get(i).get().getName());
从这一行我认为有一些期货没有结果可以为空,所以你应该检查你的代码,如果有一些期货null,如果是这样,在执行此行之前得到一个if。