从Java ExecutorService捕获线程exception

我正在研究用于并行计算JavaSeis.org的软件开发框架。 我需要一个强大的机制来报告线程exception。 在开发过程中,了解exception的来源具有很高的价值,因此我想在过度报告方面犯错误。 我也希望能够在线程中处理Junit4测试。 方法是否合理或是否有更好的方法?

import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestThreadFailure { public static void main(String[] args) { int size = 1; ExecutorService exec = Executors.newFixedThreadPool(size); ThreadFailTask worker = new ThreadFailTask(); Future result = exec.submit(worker); try { Integer value = result.get(); System.out.println("Result: " + value); } catch (Throwable t) { System.out.println("Caught failure: " + t.toString()); exec.shutdownNow(); System.out.println("Stack Trace:"); t.printStackTrace(); return; } throw new RuntimeException("Did not catch failure !!"); } public static class ThreadFailTask implements Callable { @Override public Integer call() { int nbuf = 65536; double[][] buf = new double[nbuf][nbuf]; return new Integer((int) buf[0][0]); } } } 

在使用submit()时,我不相信有一个标准的“钩子”来解决这些exception。 但是,如果你需要支持submit() (这听起来合理,假设你使用Callable ),你总是可以包装Callables和Runnables:

 ExecutorService executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque()) { @Override public  Future submit(final Callable task) { Callable wrappedTask = new Callable() { @Override public T call() throws Exception { try { return task.call(); } catch (Exception e) { System.out.println("Oh boy, something broke!"); e.printStackTrace(); throw e; } } }; return super.submit(wrappedTask); } }; 

当然,这种方法只有在你首先构建ExecutorService时才有效。 此外,请记住覆盖所有三个submit()变体。

考虑在ExecutorService上调用execute()而不是submit() 。 使用execute()调用的Thread.UncaughtExceptionHandler在失败时调用Thread.UncaughtExceptionHandler

只需创建一个ThreadFactory ,在所有Threads上安装Thread.UncaughtExceptionHandler ,然后在ExecutorService上调用execute()而不是submit()

看看这个相关的堆栈溢出问题 。

正如本线程中所解释的那样,使用ThreadPoolExecutor提交和执行方法有什么区别 ,使用execute只有在实现Runnable而不是Callable时才能工作,因为execute不能返回Future。

我认为在你的场景中你应该构建未来的对象,以便它也可以容纳exception的东西。 因此,如果出现exception,则构建错误消息对象。

我的原始问题询问如何使用Java ExecutorService实现“健壮”线程exception处理。 感谢Angelo和Greg指出exception处理如何与ExecutorService.submit()和Future.get()一起使用。 我修改后的代码片段如下所示。 我在这里学到的关键点是Future.get()捕获所有exception。 如果线程被中断或取消,则会获得相应的exception,否则,exception将被包装并作为ExecutionException重新抛出。

 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;

公共类TestThreadFailure {

   public static void main(String [] args){
     int size = 1;
     ExecutorService exec = Executors.newFixedThreadPool(size);
     ThreadFailTask​​ worker = new ThreadFailTask​​();
    未来结果= exec.submit(worker);
    尝试{
      整数值= result.get();
       System.out.println(“结果:”+值);
     } catch(ExecutionException ex){
       System.out.println(“Caught failure:”+ ex.toString());
       exec.shutdownNow();
      返回;
     } catch(InterruptedException iex){
       System.out.println(“Thread interrupted:”+ iex.toString());
     } catch(CancellationException cex){
       System.out.println(“Thread canceled:”+ cex.toString());
     }
     exec.shutdownNow();
    抛出新的RuntimeException(“没有捕获失败!!”);
   }

   public static class ThreadFailTask​​实现Callable {
     @覆盖
     public Integer call(){
       int nbuf = 65536;
       double [] [] buf = new double [nbuf] [nbuf];
       return new Integer((int)buf [0] [0]);
     }
   }
 }

我对其他答案没有太大的运气,因为我需要实际的exception实例本身,而不仅仅是打印的堆栈跟踪。 对我来说,涉及ThreadPoolExecutor的接受答案#Execute()的问题“ 为什么UncaughtExceptionHandler没有被ExecutorService调用? ”工作。

请参阅以下示例代码:

 List tasks = new LinkedList<>(); for (int i = 0; i < numThreads; ++i) { Runnable task = new Runnable() { @Override public void run() { throw new RuntimeException(); } }; tasks.add(task); } Optional opEmpty = Optional.empty(); /* * Use AtomicReference as a means of capturing the first thrown exception, since a * spawned thread can't "throw" an exception to the parent thread. */ final AtomicReference> firstThrownException = new AtomicReference<>(opEmpty); /* * Use new ThreadPoolExecutor instead of Executors.newFixedThreadPool() so * that I can override afterExecute() for the purposes of throwing an * exception from the test thread if a child thread fails. */ ExecutorService execSvc = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) { @Override public void afterExecute(Runnable task, Throwable failureCause) { if(failureCause == null) { // The Runnable completed successfully. return; } // only sets the first exception because it will only be empty on the first call. firstThrownException.compareAndSet(Optional.empty(), Optional.of(failureCause)); } }; for (Runnable task : tasks) { execSvc.execute(task); } execSvc.shutdown(); execSvc.awaitTermination(1, TimeUnit.HOURS); assertEquals(firstThrownException.get(), Optional.empty()); 

要处理ExecutorService中的exception,您必须利用CallableFuture

请观看以下video了解更多详情。 希望这会对你有所帮助。

video:可调用和未来(11分钟)