执行程序:如果递归创建任务,如何同步等待所有任务完成?
我的问题与此问题密切相关。 正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成。 然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理。 这使收集所有这些任务的未来变得有点尴尬。
我们当前的解决方案使用忙等待循环来等待终止:
do { //Wait until we are done the processing try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } while (!executor.getQueue().isEmpty() || numTasks.longValue() > executor.getCompletedTaskCount());
numTasks是一个在创建每个新任务时增加的值。 这有效但我认为由于忙碌的等待而不是很好。 我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒。
非常感谢你的所有建议!
最后,我选择了一些我认为相当简单的东西。 我发现CountDownLatch几乎就是我所需要的。 它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务。 因此我实现了一个新的类CountLatch
,它提供了额外的function。 (见下文)这个课我然后使用如下。
主线程调用latch.awaitZero()
,阻塞直到latch达到0。
任何线程,在调用executor.execute(..)
之前调用latch.increment()
。
在完成之前的任何任务都调用latch.decrement()
。
当最后一个任务终止时,计数器将达到0,从而释放主线程。
欢迎提供进一步的建议和反馈!
public class CountLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected int acquireNonBlocking(int acquires) { // increment count for (;;) { int c = getState(); int nextc = c + 1; if (compareAndSetState(c, nextc)) return 1; } } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountLatch(int count) { this.sync = new Sync(count); } public void awaitZero() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void increment() { sync.acquireNonBlocking(1); } public void decrement() { sync.releaseShared(1); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
请注意, increment()
/ decrement()
调用可以封装到自定义的Executor
子类中,例如,由Sami Korhonen建议,或者由impl建议的beforeExecute
和afterExecute
。 看这里:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor { protected final CountLatch numRunningTasks = new CountLatch(0); public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { numRunningTasks.increment(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { numRunningTasks.decrement(); super.afterExecute(r, t); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion() throws InterruptedException { numRunningTasks.awaitZero(); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException { numRunningTasks.awaitZero(timeout, unit); } }
这个实际上是一个非常有趣的问题需要解决。 我必须警告我没有完全测试代码。
想法是简单地跟踪任务执行:
- 如果任务成功排队,则计数器加1
- 如果任务被取消且尚未执行,则计数器减1
- 如果任务已执行,则计数器减1
当调用shutdown并且有待处理的任务时,delegate不会在实际的ExecutorService上调用shutdown。 它将允许排队新任务,直到挂起的任务计数达到零,并在实际的ExecutorService上调用shutdown。
public class ResilientExecutorServiceDelegate implements ExecutorService { private final ExecutorService executorService; private final AtomicInteger pendingTasks; private final Lock readLock; private final Lock writeLock; private boolean isShutdown; public ResilientExecutorServiceDelegate(ExecutorService executorService) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.pendingTasks = new AtomicInteger(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.executorService = executorService; this.isShutdown = false; } private T addTask(Callable task) { T result; boolean success = false; // Increment pending tasks counter incrementPendingTaskCount(); try { // Call service result = task.call(); success = true; } catch (RuntimeException exception) { throw exception; } catch (Exception exception) { throw new RejectedExecutionException(exception); } finally { if (!success) { // Decrement pending tasks counter decrementPendingTaskCount(); } } return result; } private void incrementPendingTaskCount() { pendingTasks.incrementAndGet(); } private void decrementPendingTaskCount() { readLock.lock(); if (pendingTasks.decrementAndGet() == 0 && isShutdown) { try { // Shutdown executorService.shutdown(); } catch (Throwable throwable) { } } readLock.unlock(); } @Override public void execute(final Runnable task) { // Add task addTask(new Callable
Java 7提供了一个适合这个名为Phaser的用例的同步器 。 它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch)。
在此方案中使用相位器的基本模式是在创建时使用移相器注册任务,并在完成时到达 。 当到达方的数量与登记的数量匹配时,相位器“前进”到下一阶段,在进行时通知任何等待线程。
这是我创建的等待递归任务完成的示例。 为了演示目的,它天真地找到Fibonacci序列的前几个数字:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicLong; /** * An example of using a Phaser to wait for the completion of recursive tasks. * @author Voxelot */ public class PhaserExample { /** Workstealing threadpool with reduced queue contention. */ private static ForkJoinPool executors; /** * @param args the command line arguments */ public static void main(String[] args) throws InterruptedException { executors = new ForkJoinPool(); List sequence = new ArrayList<>(); for (int i = 0; i < 20; i++) { sequence.add(fib(i)); } System.out.println(sequence); } /** * Computes the nth Fibonacci number in the Fibonacci sequence. * @param n The index of the Fibonacci number to compute * @return The computed Fibonacci number */ private static Long fib(int n) throws InterruptedException { AtomicLong result = new AtomicLong(); //Flexible sychronization barrier Phaser phaser = new Phaser(); //Base task Task initialTask = new Task(n, result, phaser); //Register fib(n) calling thread phaser.register(); //Submit base task executors.submit(initialTask); //Make the calling thread arrive at the synchronization //barrier and wait for all future tasks to arrive. phaser.arriveAndAwaitAdvance(); //Get the result of the parallel computation. return result.get(); } private static class Task implements Runnable { /** The Fibonacci sequence index of this task. */ private final int index; /** The shared result of the computation. */ private final AtomicLong result; /** The synchronizer. */ private final Phaser phaser; public Task(int n, AtomicLong result, Phaser phaser) { index = n; this.result = result; this.phaser = phaser; //Inform synchronizer of additional work to complete. phaser.register(); } @Override public void run() { if (index == 1) { result.incrementAndGet(); } else if (index > 1) { //recurrence relation: Fn = Fn-1 + Fn-2 Task task1 = new Task(index - 1, result, phaser); Task task2 = new Task(index - 2, result, phaser); executors.submit(task1); executors.submit(task2); } //Notify synchronizer of task completion. phaser.arrive(); } } }
你为什么不用柜台? 例如:
private AtomicInteger counter = new AtomicInteger(0);
在将任务提交到队列之前将计数器递增1:
counter.incrementAndGet();
并在任务结束时将其减1:
counter.decrementAndGet();
检查将是这样的:
// ... while (counter.get() > 0);
Java 7通过其ForkJoinPool执行器集成了对递归任务的支持。 只要任务本身不是太微不足道,它使用起来非常简单并且可以很好地扩展。 本质上,它提供了一个受控接口,允许任务等待任何子任务的完成,而不会无限期地阻塞底层线程。
您链接到的答案中建议的选项之一是使用CompletionService
您可以使用以下命令替换主线程中的忙等待:
while (true) { Future> f = completionService.take(); //blocks until task completes if (executor.getQueue().isEmpty() && numTasks.longValue() == executor.getCompletedTaskCount()) break; }
请注意, getCompletedTaskCount
仅返回一个近似数字,因此您可能需要找到更好的退出条件。
如果你知道要等待的线程数,可以在CountDownLatch的帮助下粘贴一行代码来增加每个线程的数量( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ CountDownLatch.html )它可以解决你的问题
由于上一个任务不知道它是最后一个,我实际上并不认为可以100%正确地完成这项工作,而无需在任务启动和完成时记录。
如果内存对我getQueue()
,则getQueue()
方法返回一个队列,该队列仅包含仍在等待执行的任务,而不是当前正在运行的任务。 此外, getCompletedTaskCount()
是近似值。
我正在思考的解决方案是这样的,使用像Eng.Fouad的答案中的primefaces计数器和用于发出主线程唤醒信号的条件 (为简单起见,请原谅快捷方式):
public class MyThreadPoolExecutorState { public final Lock lock = new ReentrantLock(); public final Condition workDone = lock.newCondition(); public boolean workIsDone = false; } public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final MyThreadPoolExecutorState state; private final AtomicInteger counter = new AtomicInteger(0); public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) { super(...); this.state = state; } protected void beforeExecute(Thread t, Runnable r) { this.counter.incrementAndGet(); } protected void afterExecute(Runnable r, Throwable t) { if(this.counter.decrementAndGet() == 0) { this.state.lock.lock(); try { this.state.workIsDone = true; this.state.workDone.signal(); } finally { this.state.lock.unlock(); } } } } public class MyApp { public static void main(...) { MyThreadPoolExecutorState state = new MyThreadPoolExecutorState(); MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...); // Fire ze missiles! executor.submit(...); state.lock.lock(); try { while(state.workIsDone == false) { state.workDone.await(); } } finally { state.lock.unlock(); } } }
它可能更优雅一些(可能只是在你的线程池执行器或其他东西中提供一个getState()
),但我认为应该完成工作。 这也是未经测试的,所以要自己承担责任……
值得注意的是,如果没有任务要执行,这个解决方案肯定会失败 – 它将无限期地等待信号。 因此,如果您没有要运行的任务,甚至不必费心启动执行程序。
编辑:第二个想法,递增primefaces计数器应该在提交时发生,而不是在任务执行之前发生(因为排队可能导致计数器过早地降至0)。 替代覆盖submit(...)
方法可能是有意义的,并且可能还remove(...)
和shutdown()
(如果使用它们)。 但总体思路仍然相同。 (但我想的越多,它就越不漂亮。)
我还要查看课程的内部,看看你是否可以从中收集任何知识: http : //hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/ classes / java / util / concurrent / ThreadPoolExecutor.java 。 tryTerminate()
方法看起来很有趣。
您可以使用primefaces计数器来计算提交(就像在实际提交之前所说的那样)。 将它与信号量结合并在ThreadPoolExecutor
提供的afterExecute
钩子中释放它。 在提交第一轮作业后,调用semaphore.acquire( counter.get())
而不是忙等待。 但是在调用获取时获取的数量太少,因为计数器可能会在以后增加。 您将不得不循环获取调用,自上次调用以来的增加作为参数,直到计数器不再增加。