扩展FutureTask,如何处理取消

我已经从java.util.concurrent扩展了FutureTask ,以提供回调来跟踪提交给ExecutorService的任务的ExecutorService

 public class StatusTask extends FutureTask { private final ITaskStatusHandler statusHandler; public StatusTask(Callable callable, ITaskStatusHandler statusHandler){ super(callable); if (statusHandler == null) throw new NullPointerException("statusHandler cannot be null"); this.statusHandler = statusHandler; statusHandler.TaskCreated(this); } @Override public void run() { statusHandler.TaskRunning(this); super.run(); } @Override protected void done() { super.done(); statusHandler.TaskCompleted(this); } } 

现在,我看到的是,如果任务已提交,但最终排队并cancel(true); 任务 – 仍然调用run()方法 – 并且FutureTask.run() (可能)检查任务是否被取消,并且不调用包装的可调用对象。

我应该这样做吗

 @Override public void run() { if(!isCancelled()) { statusHandler.TaskRunning(this); super.run(); } } 

或者我还应该打电话给super.run() ? 这两种方法在检查取消和做某事之间似乎容易受到竞争条件的影响。任何想法都值得赞赏。

你说那里有一场比赛你是对的。 FutureTask#done() 最多只会调用一次 ,所以如果任务在通过RunnableFuture#run()运行之前已被取消,那么你将错过对FutureTask#done()的调用。

您是否考虑过一种更简单的方法,它始终向ITaskStatusHandler#taskRunning()ITaskStatusHandler#taskCompleted()发出一组对称的调用,如此?

 @Override public void run() { statusHandler.TaskRunning(this); try { super.run(); finally { statusHandler.TaskCompleted(this); } } 

一旦RunnableFuture#run()被调用,你的任务就是在运行,或者至少是在尝试运行。 FutureTask#run()完成后,您的任务就不再运行了。 碰巧的是,在取消的情况下,过渡是(几乎)立即的。

试图避免调用ITaskStatusHandler#taskRunning()如果FutureTask#run()从未调用内部CallableRunnable FutureTask#run()将要求您在CallableRunnableFutureTask -derived类型本身之间建立一些共享结构,这样当你的内心函数首先调用你设置一些外部FutureTask派生类型可以作为锁存器观察的标志,表示是,该函数被取消之前确实开始运行。 但是,到那时,你必须已经致力于调用ITaskStatusHandler#taskRunning() ,所以这种区别并没有那么有用。

我最近一直在努力解决类似的设计问题,最后在我的被覆盖的FutureTask#run()方法中解决了操作 前后的对称问题。

你的问题是你的未来任务在你取消取消后仍然执行,对吧?

在将任务提交给执行程序服务之后,它应由执行程序管理。 (如果愿意,您仍然可以取消单个任务。)您应该使用executor shutdownNow方法取消执行。 (这将调用所有提交任务的cancel方法。)关闭仍将执行所有提交的任务。

执行者不会“知道”任务被取消。 它将独立于未来任务的内部状态调用该方法。

最简单的方法是按原样使用Executor框架并编写一个Callable装饰器。

 class CallableDecorator{ CallableDecorator(Decorated decorated){ ... } setTask(FutureTask task){ statusHandler.taskCreated(task); } void call(){ try{ statusHandler.taskRunning(task); decorated.call(); }finally{ statusHandler.taskCompleted(task); } } } 

唯一的问题是任务不能在装饰器的构造函数中。 (它是未来任务构造函数的参数。)要打破此循环,您必须使用setter或某些代理来解决构造函数注入问题。 也许根本不需要回调,你可以说: statusHandler.callableStarted(decorated)

根据您的要求,您可能需要发出exception和中断信号。

基本实施:

 class CallableDecorator implements Callable { private final Callable decorated; CallableDecorator(Callable decorated){ this.decorated = decorated; } @Override public T call() throws Exception { out.println("before " + currentThread()); try { return decorated.call(); }catch(InterruptedException e){ out.println("interupted " + currentThread()); throw e; } finally { out.println("after " + currentThread()); } } } ExecutorService executor = newFixedThreadPool(1); Future normal = executor.submit(new CallableDecorator( new Callable() { @Override public Long call() throws Exception { return System.currentTimeMillis(); } })); out.println(normal.get()); Future blocking = executor.submit(new CallableDecorator( new Callable() { @Override public Long call() throws Exception { sleep(MINUTES.toMillis(2)); // blocking call return null; } })); sleep(SECONDS.toMillis(1)); blocking.cancel(true); // or executor.shutdownNow(); 

输出:

 before Thread[pool-1-thread-1,5,main] after Thread[pool-1-thread-1,5,main] 1259347519104 before Thread[pool-1-thread-1,5,main] interupted Thread[pool-1-thread-1,5,main] after Thread[pool-1-thread-1,5,main]