如何终止multithreading中超时的任务?

我需要创建一个库,在其中我将有同步和异步方法。

  • executeSynchronous() – 等到我有结果,返回结果。
  • executeAsynchronous() – 立即返回一个Future,如果需要,可以在其他事情完成后处理。

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它。 然后,我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建将该JSON字符串发送回我们的客户DataResponse对象。 有些客户会调用executeSynchronous() ,有些可能会调用executeAsynchronous() ,这就是为什么我需要在我的库中单独提供两个方法。

接口:

 public interface Client { // for synchronous public DataResponse executeSynchronous(DataKey key); // for asynchronous public Future executeAsynchronous(DataKey key); } 

然后我有我的DataClient实现上面的Client接口:

 public class DataClient implements Client { private RestTemplate restTemplate = new RestTemplate(); private ExecutorService executor = Executors.newFixedThreadPool(10); // for synchronous call @Override public DataResponse executeSynchronous(DataKey key) { DataResponse dataResponse = null; Future future = null; try { future = executeAsynchronous(key); dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key); dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR); // does this look right the way I am doing it? future.cancel(true); // terminating tasks that have timed out. } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR); } return dataResponse; } //for asynchronous call @Override public Future executeAsynchronous(DataKey key) { Future future = null; try { Task task = new Task(key, restTemplate); future = executor.submit(task); } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); } return future; } } 

将执行实际任务的简单类:

 public class Task implements Callable { private DataKey key; private RestTemplate restTemplate; public Task(DataKey key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override public DataResponse call() { DataResponse dataResponse = null; String response = null; try { String url = createURL(); response = restTemplate.getForObject(url, String.class); // it is a successful response dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS); } catch (RestClientException ex) { PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key); dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR); } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR); } return dataResponse; } // create a URL by using key object private String createURL() { String url = somecode; return url; } } 

问题陈述:-

当我开始研究这个解决方案时,我并没有终止已经超时的任务。 我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一)。 所以我在网上进行了一些研究,我发现我可以通过在future使用cancelcancel已经超时的任务,如下所示 –

 future.cancel(true); 

但我想确保,在我的executeSynchronous方法中取消已经超时的任务的方式是否正确?

因为我在Future上调用cancel() ,如果任务仍然在队列中,它将阻止它运行,所以我不确定我在做什么是对还是不对? 这样做的正确方法是什么?

如果有更好的方法,那么任何人都能提供一个例子吗?

如果任务仍然在队列中,那么通过简单地调用future.cancel()来取消它是可以的,但显然你不知道它是否在队列中。 即使你要求future中断任务,它也可能无法工作,因为你的任务仍然可以做一些忽略线程中断状态的事情。

因此,您可以使用future.cancel(true)但您需要确保您的任务(线程)确实考虑线程中断状态。 例如,如您所述,您进行了http调用,因此您可能需要在线程中断时立即关闭http客户端资源。

请参考下面的示例。

我试图实现任务取消方案。 通常,线程可以检查isInterrupted()并尝试终止自身。 但是当你使用线程池执行器时,这变得更加复杂,可调用,如果任务不是真的像while(!Thread.isInterrupted()) {// execute task}

在这个例子中,一个任务是写一个文件(我没有使用http调用来保持简单)。 线程池执行程序开始运行任务,但调用者想要在100毫秒后取消它。 现在将来将中断信号发送给线程,但是可调用任务在写入文件时不能立即检查它。 因此,为了使这种情况发生可调用,它会维护一个将要使用的IO资源列表,并且一旦未来想要取消该任务,它就会在所有IO资源上调用cancel() ,以IOException终止任务,然后完成线程。

 public class CancellableTaskTest { public static void main(String[] args) throws Exception { CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); long startTime = System.currentTimeMillis(); Future future = threadPoolExecutor.submit(new CancellableTask()); while (System.currentTimeMillis() - startTime < 100) { Thread.sleep(10); } System.out.println("Trying to cancel task"); future.cancel(true); } } class CancellableThreadPoolExecutor extends ThreadPoolExecutor { public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected  RunnableFuture newTaskFor(Callable callable) { return new CancellableFutureTask(callable); } } class CancellableFutureTask extends FutureTask { private WeakReference weakReference; public CancellableFutureTask(Callable callable) { super(callable); if (callable instanceof CancellableTask) { this.weakReference = new WeakReference((CancellableTask) callable); } } public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); if (weakReference != null) { CancellableTask task = weakReference.get(); if (task != null) { try { task.cancel(); } catch (Exception e) { e.printStackTrace(); result = false; } } } return result; } } class CancellableTask implements Callable { private volatile boolean cancelled; private final Object lock = new Object(); private LinkedList cancellableResources = new LinkedList(); @Override public String call() throws Exception { if (!cancelled) { System.out.println("Task started"); // write file File file = File.createTempFile("testfile", ".txt"); BufferedWriter writer = new BufferedWriter(new FileWriter(file)); synchronized (lock) { cancellableResources.add(writer); } try { long lineCount = 0; while (lineCount++ < 100000000) { writer.write("This is a test text at line: " + lineCount); writer.newLine(); } System.out.println("Task completed"); } catch (Exception e) { e.printStackTrace(); } finally { writer.close(); file.delete(); synchronized (lock) { cancellableResources.clear(); } } } return "done"; } public void cancel() throws Exception { cancelled = true; Thread.sleep(1000); boolean success = false; synchronized (lock) { for (Object cancellableResource : cancellableResources) { if (cancellableResource instanceof Closeable) { ((Closeable) cancellableResource).close(); success = true; } } } System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all")); } } 

对于您的REST Http客户端相关要求,您可以修改类似于此的工厂类 -

 public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory { private List cancellableResources; public CancellableSimpleClientHttpRequestFactory() { } public CancellableSimpleClientHttpRequestFactory(List cancellableResources) { this.cancellableResources = cancellableResources; } protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException { HttpURLConnection connection = super.openConnection(url, proxy); if (cancellableResources != null) { cancellableResources.add(connection); } return connection; } } 

在这里,您需要在可运行任务中创建RestTemplate使用此工厂。

  RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources)); 

确保您传递了在CancellableTask维护的相同可取消资源列表。

现在你需要修改CancellableTaskcancel()方法,如下所示 -

 synchronized (lock) { for (Object cancellableResource : cancellableResources) { if (cancellableResource instanceof HttpURLConnection) { ((HttpURLConnection) cancellableResource).disconnect(); success = true; } } }