即使任务完成,future.isDone也返回false

我有棘手的情况,即使线程完成, future.isDone()也会返回false

 import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DataAccessor { private static ThreadPoolExecutor executor; private int timeout = 100000; static { executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue(1000)); } public static void main(String[] args) { List requests = new ArrayList(); for(int i=0; i<20; i++){ requests.add("request:"+i); } DataAccessor dataAccessor = new DataAccessor(); List results = dataAccessor.getDataFromService(requests); for(ProcessedResponse response:results){ System.out.println("response"+response.toString()+"\n"); } executor.shutdown(); } public List getDataFromService(List requests) { final CountDownLatch latch = new CountDownLatch(requests.size()); List submittedJobs = new ArrayList(requests.size()); for (String request : requests) { Future future = executor.submit(new GetAndProcessResponse(request, latch)); submittedJobs.add(new SubmittedJob(future, request)); } try { if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { // some of the jobs not done System.out.println("some jobs not done"); } } catch (InterruptedException e1) { // take care, or cleanup for (SubmittedJob job : submittedJobs) { job.getFuture().cancel(true); } } List results = new LinkedList(); for (SubmittedJob job : submittedJobs) { try { // before doing a get you may check if it is done if (!job.getFuture().isDone()) { // cancel job and continue with others job.getFuture().cancel(true); continue; } ProcessedResponse response = job.getFuture().get(); results.add(response); } catch (ExecutionException cause) { // exceptions occurred during execution, in any } catch (InterruptedException e) { // take care } } return results; } private class SubmittedJob { final String request; final Future future; public Future getFuture() { return future; } public String getRequest() { return request; } SubmittedJob(final Future job, final String request) { this.future = job; this.request = request; } } private class ProcessedResponse { private final String request; private final String response; ProcessedResponse(final String request, final String response) { this.request = request; this.response = response; } public String getRequest() { return request; } public String getResponse() { return response; } public String toString(){ return "[request:"+request+","+"response:"+ response+"]"; } } private class GetAndProcessResponse implements Callable { private final String request; private final CountDownLatch countDownLatch; GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) { this.request = request; this.countDownLatch = countDownLatch; } public ProcessedResponse call() { try { return getAndProcessResponse(this.request); } finally { countDownLatch.countDown(); } } private ProcessedResponse getAndProcessResponse(final String request) { // do the service call // ........ if("request:16".equals(request)){ throw (new RuntimeException("runtime")); } return (new ProcessedResponse(request, "response.of." + request)); } } } 

如果我调用future.isDone()它会返回false尽管coundownLatch.await()返回true。 任何想法? 还要注意,当发生这种情况时,countDownLatch.await立即出现。

如果您在此处找到格式不可读的视图,请访问http://tinyurl.com/7j6cvep 。

问题很可能是时机问题。 在关于Future的所有任务实际完成之前 ,锁存器将被释放(因为countDown()调用在call()方法内)。

你基本上是在重新创建CompletionService的工作(实现是ExecutorCompletionService ),我建议使用它。 您可以使用poll(timeout)方法来获取结果。 只需跟踪总时间,并确保每次调用时的剩余时间减少到总剩余时间。

正如jtahlborn所说,这可能是一种竞争条件,其中CountdownLatch发出等待线程的信号,等待线程在FutureTask完成执行之前评估Future的取消条件(这将在countDown之后的某个时刻发生)。

您根本不能依赖CountdownLatch的同步机制与Future的同步机制同步。 你应该做的是依靠未来来告诉你何时完成。

您可以使用Future.get(long timeout, TimeUnit.MILLISECONDS)而不是CountdownLatch.await(long timeout, TimeUnit.MILLISECONDS) 。 要获得与锁存器相同类型的效果,您可以将所有Future s添加到List ,遍历列表并获取每个Future。

以下是竞争条件的情景:

  • 主线程在latch.await ,它从Java调度程序接收没有CPU插槽毫秒
  • 最后一个执行程序线程调用finally子句中的countDownLatch.countDown()
  • Java调度程序决定为主线程提供更多优先级,因为它等待了一段时间
  • 因此,当它要求最后一个Future结果时,它还不可用,因为最后一个执行程序线程没有时间片来传播结果,它仍然在finally ……

我还没有找到关于Java调度程序如何工作的详细解释,可能是因为它主要依赖于运行JVM的操作系统,但一般来说它试图在一段时间内平均给CPU提供可运行的线程。 这就是为什么主线程可以在另一个离开finally子句之前到达isDone测试的原因。

我建议你改变你的结果’收集latch.await后。 如您所知,锁存器已减少到零(除非主线程被中断),所有结果应该很快就可用。 带有超时的get方法让调度程序有机会将时间片分配给仍在finally子句中等待的最后一个线程:

  for (SubmittedJob job : submittedJobs) { try { ProcessedResponse response = null; try { // Try to get answer in short timeout, should be available response = job.getFuture().get(10, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { job.getFuture().cancel(true); continue; } results.add(response); } catch (ExecutionException cause) { // exceptions occurred during execution, in any } catch (InterruptedException e) { // take care } } 

注意:您的代码不实际,因为getAndProcessResponse方法在不到一毫秒的时间内结束。 在那里随机睡觉,竞争条件不常出现。

我是对竞争条件的看法。 我建议忘记锁存器并使用java.util.concurrent.ThreadPoolExecutor.awaitTermination(long, TimeUnit)