执行程序:如果递归创建任务,如何同步等待所有任务完成?

我的问题与此问题密切相关。 正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成。 然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理。 这使收集所有这些任务的未来变得有点尴尬。

我们当前的解决方案使用忙等待循环来等待终止:

do { //Wait until we are done the processing try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } while (!executor.getQueue().isEmpty() || numTasks.longValue() > executor.getCompletedTaskCount()); 

numTasks是一个在创建每个新任务时增加的值。 这有效但我认为由于忙碌的等待而不是很好。 我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒。

非常感谢你的所有建议!

最后,我选择了一些我认为相当简单的东西。 我发现CountDownLatch几乎就是我所需要的。 它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务。 因此我实现了一个新的类CountLatch ,它提供了额外的function。 (见下文)这个课我然后使用如下。

主线程调用latch.awaitZero() ,阻塞直到latch达到0。

任何线程,在调用executor.execute(..)之前调用latch.increment()

在完成之前的任何任务都调用latch.decrement()

当最后一个任务终止时,计数器将达到0,从而释放主线程。

欢迎提供进一步的建议和反馈!

 public class CountLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected int acquireNonBlocking(int acquires) { // increment count for (;;) { int c = getState(); int nextc = c + 1; if (compareAndSetState(c, nextc)) return 1; } } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountLatch(int count) { this.sync = new Sync(count); } public void awaitZero() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void increment() { sync.acquireNonBlocking(1); } public void decrement() { sync.releaseShared(1); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } } 

请注意, increment() / decrement()调用可以封装到自定义的Executor子类中,例如,由Sami Korhonen建议,或者由impl建议的beforeExecuteafterExecute 。 看这里:

 public class CountingThreadPoolExecutor extends ThreadPoolExecutor { protected final CountLatch numRunningTasks = new CountLatch(0); public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { numRunningTasks.increment(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { numRunningTasks.decrement(); super.afterExecute(r, t); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion() throws InterruptedException { numRunningTasks.awaitZero(); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException { numRunningTasks.awaitZero(timeout, unit); } } 

这个实际上是一个非常有趣的问题需要解决。 我必须警告我没有完全测试代码。

想法是简单地跟踪任务执行:

  • 如果任务成功排队,则计数器加1
  • 如果任务被取消且尚未执行,则计数器减1
  • 如果任务已执行,则计数器减1

当调用shutdown并且有待处理的任务时,delegate不会在实际的ExecutorService上调用shutdown。 它将允许排队新任务,直到挂起的任务计数达到零,并在实际的ExecutorService上调用shutdown。

 public class ResilientExecutorServiceDelegate implements ExecutorService { private final ExecutorService executorService; private final AtomicInteger pendingTasks; private final Lock readLock; private final Lock writeLock; private boolean isShutdown; public ResilientExecutorServiceDelegate(ExecutorService executorService) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.pendingTasks = new AtomicInteger(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.executorService = executorService; this.isShutdown = false; } private  T addTask(Callable task) { T result; boolean success = false; // Increment pending tasks counter incrementPendingTaskCount(); try { // Call service result = task.call(); success = true; } catch (RuntimeException exception) { throw exception; } catch (Exception exception) { throw new RejectedExecutionException(exception); } finally { if (!success) { // Decrement pending tasks counter decrementPendingTaskCount(); } } return result; } private void incrementPendingTaskCount() { pendingTasks.incrementAndGet(); } private void decrementPendingTaskCount() { readLock.lock(); if (pendingTasks.decrementAndGet() == 0 && isShutdown) { try { // Shutdown executorService.shutdown(); } catch (Throwable throwable) { } } readLock.unlock(); } @Override public void execute(final Runnable task) { // Add task addTask(new Callable() { @Override public Object call() { executorService.execute(new Runnable() { @Override public void run() { try { task.run(); } finally { decrementPendingTaskCount(); } } }); return null; } }); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { // Call service return executorService.awaitTermination(timeout, unit); } @Override public  List> invokeAll(Collection> tasks) throws InterruptedException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAll(tasks); } finally { decrementPendingTaskCount(); } } @Override public  List> invokeAll( Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAll(tasks, timeout, unit); } finally { decrementPendingTaskCount(); } } @Override public  T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAny(tasks); } finally { decrementPendingTaskCount(); } } @Override public  T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { incrementPendingTaskCount(); try { return executorService.invokeAny(tasks, timeout, unit); } finally { decrementPendingTaskCount(); } } @Override public boolean isShutdown() { return isShutdown; } @Override public boolean isTerminated() { return executorService.isTerminated(); } @Override public void shutdown() { // Lock write lock writeLock.lock(); // Set as shutdown isShutdown = true; try { if (pendingTasks.get() == 0) { // Real shutdown executorService.shutdown(); } } finally { // Unlock write lock writeLock.unlock(); } } @Override public List shutdownNow() { // Lock write lock writeLock.lock(); // Set as shutdown isShutdown = true; // Unlock write lock writeLock.unlock(); return executorService.shutdownNow(); } @Override public  Future submit(final Callable task) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override public Future call() { return new FutureDelegate( executorService.submit(new Callable() { @Override public T call() throws Exception { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task return task.call(); } finally { decrementPendingTaskCount(); } } }), futureExecutionStatus); } }); } @Override public Future submit(final Runnable task) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override @SuppressWarnings("unchecked") public Future call() { return new FutureDelegate( (Future) executorService.submit(new Runnable() { @Override public void run() { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task task.run(); } finally { decrementPendingTaskCount(); } } }), futureExecutionStatus); } }); } @Override public  Future submit(final Runnable task, final T result) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override public Future call() { return new FutureDelegate(executorService.submit( new Runnable() { @Override public void run() { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task task.run(); } finally { decrementPendingTaskCount(); } } }, result), futureExecutionStatus); } }); } private class FutureExecutionStatus { private volatile boolean executed; public FutureExecutionStatus() { executed = false; } public void setExecuted() { executed = true; } public boolean isExecuted() { return executed; } } private class FutureDelegate implements Future { private Future future; private FutureExecutionStatus executionStatus; public FutureDelegate(Future future, FutureExecutionStatus executionStatus) { this.future = future; this.executionStatus = executionStatus; } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = future.cancel(mayInterruptIfRunning); if (cancelled) { // Lock read lock readLock.lock(); // If task was not executed if (!executionStatus.isExecuted()) { decrementPendingTaskCount(); } // Unlock read lock readLock.unlock(); } return cancelled; } @Override public T get() throws InterruptedException, ExecutionException { return future.get(); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return future.get(timeout, unit); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } } } 

Java 7提供了一个适合这个名为Phaser的用例的同步器 。 它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch)。

在此方案中使用相位器的基本模式是在创建时使用移相器注册任务,并在完成时到达 。 当到达方的数量与登记的数量匹配时,相位器“前进”到下一阶段,在进行时通知任何等待线程。

这是我创建的等待递归任务完成的示例。 为了演示目的,它天真地找到Fibonacci序列的前几个数字:

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicLong; /** * An example of using a Phaser to wait for the completion of recursive tasks. * @author Voxelot */ public class PhaserExample { /** Workstealing threadpool with reduced queue contention. */ private static ForkJoinPool executors; /** * @param args the command line arguments */ public static void main(String[] args) throws InterruptedException { executors = new ForkJoinPool(); List sequence = new ArrayList<>(); for (int i = 0; i < 20; i++) { sequence.add(fib(i)); } System.out.println(sequence); } /** * Computes the nth Fibonacci number in the Fibonacci sequence. * @param n The index of the Fibonacci number to compute * @return The computed Fibonacci number */ private static Long fib(int n) throws InterruptedException { AtomicLong result = new AtomicLong(); //Flexible sychronization barrier Phaser phaser = new Phaser(); //Base task Task initialTask = new Task(n, result, phaser); //Register fib(n) calling thread phaser.register(); //Submit base task executors.submit(initialTask); //Make the calling thread arrive at the synchronization //barrier and wait for all future tasks to arrive. phaser.arriveAndAwaitAdvance(); //Get the result of the parallel computation. return result.get(); } private static class Task implements Runnable { /** The Fibonacci sequence index of this task. */ private final int index; /** The shared result of the computation. */ private final AtomicLong result; /** The synchronizer. */ private final Phaser phaser; public Task(int n, AtomicLong result, Phaser phaser) { index = n; this.result = result; this.phaser = phaser; //Inform synchronizer of additional work to complete. phaser.register(); } @Override public void run() { if (index == 1) { result.incrementAndGet(); } else if (index > 1) { //recurrence relation: Fn = Fn-1 + Fn-2 Task task1 = new Task(index - 1, result, phaser); Task task2 = new Task(index - 2, result, phaser); executors.submit(task1); executors.submit(task2); } //Notify synchronizer of task completion. phaser.arrive(); } } } 

你为什么不用柜台? 例如:

 private AtomicInteger counter = new AtomicInteger(0); 

在将任务提交到队列之前将计数器递增1:

 counter.incrementAndGet(); 

并在任务结束时将其减1:

 counter.decrementAndGet(); 

检查将是这样的:

 // ... while (counter.get() > 0); 

Java 7通过其ForkJoinPool执行器集成了对递归任务的支持。 只要任务本身不是太微不足道,它使用起来非常简单并且可以很好地扩展。 本质上,它提供了一个受控接口,允许任务等待任何子任务的完成,而不会无限期地阻塞底层线程。

您链接到的答案中建议的选项之一是使用CompletionService

您可以使用以下命令替换主线程中的忙等待:

 while (true) { Future f = completionService.take(); //blocks until task completes if (executor.getQueue().isEmpty() && numTasks.longValue() == executor.getCompletedTaskCount()) break; } 

请注意, getCompletedTaskCount仅返回一个近似数字,因此您可能需要找到更好的退出条件。

如果你知道要等待的线程数,可以在CountDownLatch的帮助下粘贴一行代码来增加每个线程的数量( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ CountDownLatch.html )它可以解决你的问题

由于上一个任务不知道它是最后一个,我实际上并不认为可以100%正确地完成这项工作,而无需在任务启动和完成时记录。

如果内存对我getQueue() ,则getQueue()方法返回一个队列,该队列仅包含仍在等待执行的任务,而不是当前正在运行的任务。 此外, getCompletedTaskCount()是近似值。

我正在思考的解决方案是这样的,使用像Eng.Fouad的答案中的primefaces计数器和用于发出主线程唤醒信号的条件 (为简单起见,请原谅快捷方式):

 public class MyThreadPoolExecutorState { public final Lock lock = new ReentrantLock(); public final Condition workDone = lock.newCondition(); public boolean workIsDone = false; } public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final MyThreadPoolExecutorState state; private final AtomicInteger counter = new AtomicInteger(0); public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) { super(...); this.state = state; } protected void beforeExecute(Thread t, Runnable r) { this.counter.incrementAndGet(); } protected void afterExecute(Runnable r, Throwable t) { if(this.counter.decrementAndGet() == 0) { this.state.lock.lock(); try { this.state.workIsDone = true; this.state.workDone.signal(); } finally { this.state.lock.unlock(); } } } } public class MyApp { public static void main(...) { MyThreadPoolExecutorState state = new MyThreadPoolExecutorState(); MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...); // Fire ze missiles! executor.submit(...); state.lock.lock(); try { while(state.workIsDone == false) { state.workDone.await(); } } finally { state.lock.unlock(); } } } 

它可能更优雅一些(可能只是在你的线程池执行器或其他东西中提供一个getState() ),但我认为应该完成工作。 这也是未经测试的,所以要自己承担责任……

值得注意的是,如果没有任务要执行,这个解决方案肯定会失败 – 它将无限期地等待信号。 因此,如果您没有要运行的任务,甚至不必费心启动执行程序。


编辑:第二个想法,递增primefaces计数器应该在提交时发生,而不是在任务执行之前发生(因为排队可能导致计数器过早地降至0)。 替代覆盖submit(...)方法可能是有意义的,并且可能还remove(...)shutdown() (如果使用它们)。 但总体思路仍然相同。 (但我想的越多,它就越不漂亮。)

我还要查看课程的内部,看看你是否可以从中收集任何知识: http : //hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/ classes / java / util / concurrent / ThreadPoolExecutor.java 。 tryTerminate()方法看起来很有趣。

您可以使用primefaces计数器来计算提交(就像在实际提交之前所说的那样)。 将它与信号量结合并在ThreadPoolExecutor提供的afterExecute钩子中释放它。 在提交第一轮作业后,调用semaphore.acquire( counter.get())而不是忙等待。 但是在调用获取时获取的数量太少,因为计数器可能会在以后增加。 您将不得不循环获取调用,自上次调用以来的增加作为参数,直到计数器不再增加。