扩展FutureTask,如何处理取消
我已经从java.util.concurrent
扩展了FutureTask
,以提供回调来跟踪提交给ExecutorService
的任务的ExecutorService
。
public class StatusTask extends FutureTask { private final ITaskStatusHandler statusHandler; public StatusTask(Callable callable, ITaskStatusHandler statusHandler){ super(callable); if (statusHandler == null) throw new NullPointerException("statusHandler cannot be null"); this.statusHandler = statusHandler; statusHandler.TaskCreated(this); } @Override public void run() { statusHandler.TaskRunning(this); super.run(); } @Override protected void done() { super.done(); statusHandler.TaskCompleted(this); } }
现在,我看到的是,如果任务已提交,但最终排队并cancel(true);
任务 – 仍然调用run()
方法 – 并且FutureTask.run()
(可能)检查任务是否被取消,并且不调用包装的可调用对象。
我应该这样做吗
@Override public void run() { if(!isCancelled()) { statusHandler.TaskRunning(this); super.run(); } }
或者我还应该打电话给super.run()
? 这两种方法在检查取消和做某事之间似乎容易受到竞争条件的影响。任何想法都值得赞赏。
你说那里有一场比赛你是对的。 FutureTask#done()
最多只会调用一次 ,所以如果任务在通过RunnableFuture#run()
运行之前已被取消,那么你将错过对FutureTask#done()
的调用。
您是否考虑过一种更简单的方法,它始终向ITaskStatusHandler#taskRunning()
和ITaskStatusHandler#taskCompleted()
发出一组对称的调用,如此?
@Override public void run() { statusHandler.TaskRunning(this); try { super.run(); finally { statusHandler.TaskCompleted(this); } }
一旦RunnableFuture#run()
被调用,你的任务就是在运行,或者至少是在尝试运行。 FutureTask#run()
完成后,您的任务就不再运行了。 碰巧的是,在取消的情况下,过渡是(几乎)立即的。
试图避免调用ITaskStatusHandler#taskRunning()
如果FutureTask#run()
从未调用内部Callable
或Runnable
FutureTask#run()
将要求您在Callable
或Runnable
和FutureTask
-derived类型本身之间建立一些共享结构,这样当你的内心函数首先调用你设置一些外部FutureTask
派生类型可以作为锁存器观察的标志,表示是,该函数在被取消之前确实开始运行。 但是,到那时,你必须已经致力于调用ITaskStatusHandler#taskRunning()
,所以这种区别并没有那么有用。
我最近一直在努力解决类似的设计问题,最后在我的被覆盖的FutureTask#run()
方法中解决了操作 前后的对称问题。
你的问题是你的未来任务在你取消取消后仍然执行,对吧?
在将任务提交给执行程序服务之后,它应由执行程序管理。 (如果愿意,您仍然可以取消单个任务。)您应该使用executor shutdownNow方法取消执行。 (这将调用所有提交任务的cancel方法。)关闭仍将执行所有提交的任务。
执行者不会“知道”任务被取消。 它将独立于未来任务的内部状态调用该方法。
最简单的方法是按原样使用Executor框架并编写一个Callable装饰器。
class CallableDecorator{ CallableDecorator(Decorated decorated){ ... } setTask(FutureTask task){ statusHandler.taskCreated(task); } void call(){ try{ statusHandler.taskRunning(task); decorated.call(); }finally{ statusHandler.taskCompleted(task); } } }
唯一的问题是任务不能在装饰器的构造函数中。 (它是未来任务构造函数的参数。)要打破此循环,您必须使用setter或某些代理来解决构造函数注入问题。 也许根本不需要回调,你可以说: statusHandler.callableStarted(decorated)
。
根据您的要求,您可能需要发出exception和中断信号。
基本实施:
class CallableDecorator implements Callable { private final Callable decorated; CallableDecorator(Callable decorated){ this.decorated = decorated; } @Override public T call() throws Exception { out.println("before " + currentThread()); try { return decorated.call(); }catch(InterruptedException e){ out.println("interupted " + currentThread()); throw e; } finally { out.println("after " + currentThread()); } } } ExecutorService executor = newFixedThreadPool(1); Future normal = executor.submit(new CallableDecorator ( new Callable () { @Override public Long call() throws Exception { return System.currentTimeMillis(); } })); out.println(normal.get()); Future blocking = executor.submit(new CallableDecorator ( new Callable () { @Override public Long call() throws Exception { sleep(MINUTES.toMillis(2)); // blocking call return null; } })); sleep(SECONDS.toMillis(1)); blocking.cancel(true); // or executor.shutdownNow();
输出:
before Thread[pool-1-thread-1,5,main] after Thread[pool-1-thread-1,5,main] 1259347519104 before Thread[pool-1-thread-1,5,main] interupted Thread[pool-1-thread-1,5,main] after Thread[pool-1-thread-1,5,main]