处理ThreadPoolExecutor的exception

我有以下代码片段,它基本上扫描需要执行的任务列表,然后将每个任务提供给执行程序执行。

JobExecutor反过来创建另一个执行程序(用于执行数据库内容…读取和写入数据到队列)并完成任务。

JobExecutor为提交的任务返回Future 。 当其中一个任务失败时,我想优雅地中断所有线程并通过捕获所有exception来关闭执行程序。 我需要做哪些改变?

 public class DataMovingClass { private static final AtomicInteger uniqueId = new AtomicInteger(0); private static final ThreadLocal uniqueNumber = new IDGenerator(); ThreadPoolExecutor threadPoolExecutor = null ; private List sources = new ArrayList(); private static class IDGenerator extends ThreadLocal { @Override public Integer get() { return uniqueId.incrementAndGet(); } } public void init(){ // load sources list } public boolean execute() { boolean succcess = true ; threadPoolExecutor = new ThreadPoolExecutor(10,10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("DataMigration-" + uniqueNumber.get()); return t; }// End method }, new ThreadPoolExecutor.CallerRunsPolicy()); List<Future> result = new ArrayList<Future>(); for (Source source : sources) { result.add(threadPoolExecutor.submit(new JobExecutor(source))); } for (Future jobDone : result) { try { if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { // in case of successful DbWriterClass, we don't need to change // it. success = false; } } catch (Exception ex) { // handle exceptions } } } public class JobExecutor implements Callable { private ThreadPoolExecutor threadPoolExecutor ; Source jobSource ; public SourceJobExecutor(Source source) { this.jobSource = source; threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("Job Executor-" + uniqueNumber.get()); return t; }// End method }, new ThreadPoolExecutor.CallerRunsPolicy()); } public Boolean call() throws Exception { boolean status = true ; System.out.println("Starting Job = " + jobSource.getName()); try { // do the specified task ; }catch (InterruptedException intrEx) { logger.warn("InterruptedException", intrEx); status = false ; } catch(Exception e) { logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); status = false ; } System.out.println("Ending Job = " + jobSource.getName()); return status ; } } } 

当您向执行程序提交任务时,它会返回FutureTask实例。

FutureTask.get()将重新抛出任务抛出的任何exception作为ExecutorException

因此,当您遍历List并在每个上调用get时,捕获ExecutorException并调用有序关闭。

由于您要向ThreadPoolExecutor提交任务,因此FutureTask会吞下exception。

看看这段代码

 **Inside FutureTask$Sync** void innerRun() { if (!compareAndSetState(READY, RUNNING)) return; runner = Thread.currentThread(); if (getState() == RUNNING) { // recheck after setting thread V result; try { result = callable.call(); } catch (Throwable ex) { setException(ex); return; } set(result); } else { releaseShared(0); // cancel } 

}

 protected void setException(Throwable t) { sync.innerSetException(t); } 

从上面的代码中可以看出, setException方法很setException捕获Throwable 。 由于这个原因,如果在ThreadPoolExecutor上使用“ submit() ”方法,则FutureTask会吞下所有exception

根据java 文档 ,您可以在ThreadPoolExecutor扩展afterExecute()方法

 protected void afterExecute(Runnable r, Throwable t) 

根据文档的示例代码:

 class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future) { try { Object result = ((Future) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) System.out.println(t); } } 

您可以通过三种方式捕获Exceptions

  1. Future.get()在接受的答案中建议
  2. try{}catch{}Exceptoion{}块中包装整个run()call()方法
  3. 如上所示覆盖ThreadPoolExecutor方法的afterExecute

要优雅地中断其他线程,请看下面的SE问题:

如何阻止下一个线程在ScheduledThreadPoolExecutor中运行

如何强制关闭java ExecutorService

子类ThreadPoolExecutor并覆盖其protected afterExecute (Runnable r, Throwable t)方法。

如果你是通过java.util.concurrent.Executors方便类(你不是)创建一个线程池,那么看一下它的来源,看看它是如何调用ThreadPoolExecutor的 。