处理ThreadPoolExecutor的exception
我有以下代码片段,它基本上扫描需要执行的任务列表,然后将每个任务提供给执行程序执行。
JobExecutor
反过来创建另一个执行程序(用于执行数据库内容…读取和写入数据到队列)并完成任务。
JobExecutor
为提交的任务返回Future
。 当其中一个任务失败时,我想优雅地中断所有线程并通过捕获所有exception来关闭执行程序。 我需要做哪些改变?
public class DataMovingClass { private static final AtomicInteger uniqueId = new AtomicInteger(0); private static final ThreadLocal uniqueNumber = new IDGenerator(); ThreadPoolExecutor threadPoolExecutor = null ; private List sources = new ArrayList(); private static class IDGenerator extends ThreadLocal { @Override public Integer get() { return uniqueId.incrementAndGet(); } } public void init(){ // load sources list } public boolean execute() { boolean succcess = true ; threadPoolExecutor = new ThreadPoolExecutor(10,10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("DataMigration-" + uniqueNumber.get()); return t; }// End method }, new ThreadPoolExecutor.CallerRunsPolicy()); List<Future> result = new ArrayList<Future>(); for (Source source : sources) { result.add(threadPoolExecutor.submit(new JobExecutor(source))); } for (Future jobDone : result) { try { if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { // in case of successful DbWriterClass, we don't need to change // it. success = false; } } catch (Exception ex) { // handle exceptions } } } public class JobExecutor implements Callable { private ThreadPoolExecutor threadPoolExecutor ; Source jobSource ; public SourceJobExecutor(Source source) { this.jobSource = source; threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("Job Executor-" + uniqueNumber.get()); return t; }// End method }, new ThreadPoolExecutor.CallerRunsPolicy()); } public Boolean call() throws Exception { boolean status = true ; System.out.println("Starting Job = " + jobSource.getName()); try { // do the specified task ; }catch (InterruptedException intrEx) { logger.warn("InterruptedException", intrEx); status = false ; } catch(Exception e) { logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); status = false ; } System.out.println("Ending Job = " + jobSource.getName()); return status ; } } }
当您向执行程序提交任务时,它会返回FutureTask
实例。
FutureTask.get()
将重新抛出任务抛出的任何exception作为ExecutorException
。
因此,当您遍历List
并在每个上调用get时,捕获ExecutorException
并调用有序关闭。
由于您要向ThreadPoolExecutor
提交任务,因此FutureTask
会吞下exception。
看看这段代码
**Inside FutureTask$Sync** void innerRun() { if (!compareAndSetState(READY, RUNNING)) return; runner = Thread.currentThread(); if (getState() == RUNNING) { // recheck after setting thread V result; try { result = callable.call(); } catch (Throwable ex) { setException(ex); return; } set(result); } else { releaseShared(0); // cancel }
}
protected void setException(Throwable t) { sync.innerSetException(t); }
从上面的代码中可以看出, setException
方法很setException
捕获Throwable
。 由于这个原因,如果在ThreadPoolExecutor
上使用“ submit()
”方法,则FutureTask
会吞下所有exception
根据java 文档 ,您可以在ThreadPoolExecutor
扩展afterExecute()
方法
protected void afterExecute(Runnable r, Throwable t)
根据文档的示例代码:
class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future>) { try { Object result = ((Future>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) System.out.println(t); } }
您可以通过三种方式捕获Exceptions
-
Future.get()
在接受的答案中建议 - 在
try{}catch{}Exceptoion{}
块中包装整个run()
或call()
方法 - 如上所示覆盖
ThreadPoolExecutor
方法的afterExecute
要优雅地中断其他线程,请看下面的SE问题:
如何阻止下一个线程在ScheduledThreadPoolExecutor中运行
如何强制关闭java ExecutorService
子类ThreadPoolExecutor
并覆盖其protected afterExecute (Runnable r, Throwable t)
方法。
如果你是通过java.util.concurrent.Executors
方便类(你不是)创建一个线程池,那么看一下它的来源,看看它是如何调用ThreadPoolExecutor的 。
- BatchUpdateException:批处理不会终止
- 如何在Java servlet Web应用程序中获取未捕获的exception
- Swing Worker中的优雅exception处理
- 程序打印1-100素数并在给定范围内抛出复合数的exception
- 运行时/已检查/未检查/错误/exception之间的差异
- Javaexception – 在没有try catch的情况下处理exception
- Swing:从TableModel中捕获exception
- 为什么Java编译器允许在throws部分中列出exception,该方法无法抛出exception
- JavaFx 2.x – Swing:不在FX应用程序线程上