如何实现或找到线程安全的CompletionService的等价物?

我有一个在Tomcat容器中运行的简单Web服务,它本质上是multithreading的。 在进入服务的每个请求中,我想要对外部服务进行并发调用。 java.util.concurrent中的ExecutorCompletionService使我部分得到了。 我可以为它提供一个线程池,它将负责执行我的并发调用,当任何结果准备就绪时我会收到通知。

处理特定传入请求的代码可能如下所示:

void handleRequest(Integer[] input) { // Submit tasks CompletionService completionService = new ExecutorCompletionService(Executors.newCachedThreadPool()); for (final Integer i : input) { completionService.submit(new Callable() { public Integer call() { return -1 * i; } }); } // Do other stuff... // Get task results try { for (int i = 0; i < input.size; i++) { Future future = completionService.take(); Integer result = future.get(); // Do something with the result... } } catch (Exception e) { // Handle exception } } 

这应该工作正常和花花公子,但由于为每个传入请求分配了新的线程池,因此效率非常低。 如果我将CompletionService作为共享实例移出,我将遇到多个请求共享相同的CompletionService和线程池的线程安全问题。 当请求提交任务并获得结果时,他们得到的结果不是他们提交的结果。

因此,我需要的是一个线程安全的CompletionService,它允许我跨所有传入请求共享一个公共线程池。 当每个线程完成一个任务时,应该通知传入请求的相应线程,以便它可以收集结果。

实现此类function的最直接方法是什么? 我确信这种模式已经应用了很多次; 我只是不确定这是Java并发库提供的东西,还是可以使用一些Java并发构建块轻松构建。

更新:我忘了提到的一个警告是,我希望在我提交的任何任务完成后立即得到通知。 这是使用CompletionService的主要优势,因为它分离了任务和结果的生产和消费。 我实际上并不关心我得到结果的顺序,并且我希望在等待按顺序返回结果时避免不必要的阻塞。

您共享Executor但不共享CompletionService

我们有一个完全相同的AsyncCompleter并处理所有簿记,允许您:

 Iterable> jobs = jobs(); Iterable results async.invokeAll(jobs); 

results按返回顺序迭代并阻塞,直到结果可用

您可以使用普通的共享ExecutorService。 每当您提交任务时,您将获得刚刚提交的任务的Future。 您可以将所有这些存储在列表中,稍后再查询。

例:

 private final ExecutorService service = ...//a single, shared instance void handleRequest(Integer[] input) { // Submit tasks List> futures = new ArrayList>(input.length); for (final Integer i : input) { Future future = service.submit(new Callable() { public Integer call() { return -1 * i; } }); futures.add(future); } // Do other stuff... // Get task results for(Future f : futures){ try { Integer result = f.get(); } catch (Exception e) { e.printStackTrace(); } } } 

java.util.concurrent提供了您需要的一切。 如果我正确理解您的问题,您有以下要求:

您想要提交请求,并立即(在合理范围内)处理请求结果(响应)。 好吧,我相信你已经看到了你的问题的解决方案:java.util.concurrent.CompletionService。

该服务相当简单地将Executor和BlockingQueue组合在一起,以处理Runnable和/或Callable任务。 BlockingQueue用于保存已完成的任务,您可以让另一个线程等待,直到完成的任务排队(这是通过在CompletionService对象上调用take()来完成的)。

正如之前的海报所提到的,共享Executor,并为每个请求创建一个CompletionService。 这似乎是一件昂贵的事情,但再次考虑CS只是与Executor和BlockingQueue合作。 既然你正在共享最昂贵的实例化对象,即Executor,我想你会发现这是一个非常合理的成本。

但是……所有这一切,你似乎仍然有一个问题,这个问题似乎是请求处理与响应处理的分离。 这可以通过创建一个单独的服务来处理,该服务专门处理所有请求或特定类型的请求的响应。

下面是一个例子:(注意:它暗示Request对象实现了Callable接口,它应该返回一个Response类型……我已经为这个简单的例子省略了细节)。

 class RequestHandler { RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) { this.responseQueue = ... this.executor = ... } public void acceptRequest(List requestList) { for(Request req : requestList) { Response response = executor.submit(req); responseHandler.handleResponse(response); } } } class ResponseHandler { ReentrantLock lock; ResponseHandler(ExecutorService responseExecutor) { ... } public void handleResponse(Response res) { lock.lock() { try { responseExecutor.submit( new ResponseWorker(res) ); } finally { lock.unlock(); } } private static class ResponseWorker implements Runnable { ResponseWorker(Response response) { response = ... } void processResponse() { // process this response } public void run() { processResponse(); } } } 

需要记住的几件事:一,ExecutorService从阻塞队列执行Callables或Runnables; 您的RequestHandler接收任务,并在Executor上排队,并尽快处理。 在ResponseHandler中也会发生同样的事情; 收到响应,一旦SEPARATE执行程序可以,它将处理该响应。 简而言之,您有两个执行器同时工作:一个在Request对象上,另一个在Response对象上。

为什么需要CompletionService

每个线程都可以简单地在ExecutorService的“常规”和共享实例上提交或调用Callables 。 然后每个线程都保留自己的私有Future引用。

此外, Executor及其后代的设计是线程安全的。 你真正想要的是每个线程可以创建自己的任务并检查他们的结果。

java.util.concurrent的Javadoc非常好; 它包括使用模式和示例。 阅读ExecutorService和其他类型的文档,以更好地了解如何使用它们。