一旦我的线程中断,我怎么能中断RestTemplate调用?

我需要创建一个库,在其中我将具有同步和异步function。

  • executeSynchronous() – 等到我有结果,返回结果。
  • executeAsynchronous() – 立即返回一个Future,如果需要,可以在其他事情完成后处理。

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它。 然后,我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建将该JSON字符串发送回我们的客户DataResponse对象。 有些客户会调用executeSynchronous() ,有些可能会调用executeAsynchronous() ,这就是为什么我需要在我的库中单独提供两个方法。

接口:

 public interface Client { // for synchronous public DataResponse executeSynchronous(DataKey key); // for asynchronous public Future executeAsynchronous(DataKey key); } 

然后我有我的DataClient实现上面的Client接口:

 public class DataClient implements Client { private RestTemplate restTemplate = new RestTemplate(); private ExecutorService executor = Executors.newFixedThreadPool(10); // for synchronous call @Override public DataResponse executeSynchronous(DataKey key) { DataResponse dataResponse = null; Future future = null; try { future = executeAsynchronous(key); dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key); dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR); // does this looks right? future.cancel(true); // terminating tasks that have timed out } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR); } return dataResponse; } //for asynchronous call @Override public Future executeAsynchronous(DataKey key) { Future future = null; try { Task task = new Task(key, restTemplate); future = executor.submit(task); } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); } return future; } } 

将执行实际任务的简单类:

 public class Task implements Callable { private DataKey key; private RestTemplate restTemplate; public Task(DataKey key, RestTemplate restTemplate) { this.key = key; this.restTemplate = restTemplate; } @Override public DataResponse call() { DataResponse dataResponse = null; String response = null; try { String url = createURL(); response = restTemplate.getForObject(url, String.class); // it is a successful response dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS); } catch (RestClientException ex) { PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key); dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR); } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR); } return dataResponse; } // create a URL by using key object private String createURL() { String url = somecode; return url; } } 

当我开始研究这个解决方案时,我并没有终止已经超时的任务。 我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一)。 所以我在网上进行了一些研究,我发现我可以通过在将来使用取消来取消已经超时的任务,如下所示 –

 future.cancel(true); 

但是,如果我按照上面的解决方案所示这样做,那么一旦线程被中断,我是否需要关闭RestTemplate类的任何其他资源? 如果是,那我该怎么做? 另外,我们可以中断RestTemplate调用吗? 因为我一旦任务超时就尝试取消取消我的未来,但我猜我的线程没有被打断。

我们是否应该永远终止已经超时的任务? 如果我们不这样做那么可能会产生什么影响? 它会影响我的表现吗?

我目前的设置是否有更好的解决方案来处理这种情况?

有时无法中断线程,尤其是当线程在Socket上执行阻塞操作时。

因此,不应在超时时取消任务,而应该在http连接上设置超时。

不幸的是,每个Connection Factory和RestTemplate设置了timeousts,因此每个请求必须使用它自己的RestTemplate。

您可以为每个任务创建新的RestTemplate,也可以使用ThreadLocal或资源池重用以前创建的模板。

例如,使用Thread local的任务可能如下所示:

  public class Task implements Callable { private DataKey key; private ThreadLocal restTemplateThreadLocal = ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory())); public Task(DataKey key) { this.key = key; } private SimpleClientHttpRequestFactory getConnectionFactory(){ return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory(); } @Override public DataResponse call() { DataResponse dataResponse = null; String response = null; try { String url = createURL(); //it is up to you, how to set connection and read timeouts from provided key.getTimeout getConnectionFactory().setConnectTimeout(1000); getConnectionFactory().setReadTimeout(key.getTimeout()); response = restTemplateThreadLocal.get().getForObject(url, String.class); // it is a successful response dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS); } catch (RestClientException ex) { PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key); dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR); } catch (Exception ex) { PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key); dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR); } return dataResponse; } // create a URL by using key object private String createURL() { String url = somecode; return url; } } 

BTW。 Spring还提供了AsyncRestTemplate,它可以使您的代码更简单。 如果与Netty4ClientHttpRequestFactory一起使用,您可以获得基于NIO的客户端连接。 在这种情况下,即使在进行Http连接时,您也应该能够中断任务。

以下简短示例。 它使用NIO,因此您无需关心超时后是否真的取消了请求。

  URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html"); Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory(); AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory); ListenableFuture> entity = asyncRestTemplate.getForEntity(url, String.class); System.out.println("entity.get() = " + entity.get()); asyncRequestFactory.destroy(); 

似乎无法中断或取消对RestTemplate的调用。 即使使用回调的“kludge”, RestTemplate也可能在内部锁定资源,在调用回调之前等待响应。

当可以访问底层套接字时,可以通过从另一个线程关闭套接字来中止网络I / O. 例如,可以在超时过后启动计时器以关闭套接字。 或者,如果您想要一个对中断敏感的无限期超时(例如,由于用户按下“取消”按钮),您可以提交一个无限期等待但通过关闭套接字来响应中断的任务。

不幸的是,它看起来并不像RestTemplate的作者提供了这种能力。

是的,您应该清理由于任务取消或到期而不再需要的资源。 是的,它会影响性能。 如果您的线程池具有有限数量的线程,那么最终所有线程都将停留在已失效的任务中。 如果它具有无限数量的线程,最终内存将耗尽。