灵活的CountDownLatch?
我现在遇到了两次问题,即生产者线程生成N个工作项,将它们提交给ExecutorService
,然后需要等到所有N个项都被处理完毕。
注意事项
- N事先不知道 。 如果是这样的话,我只需创建一个
CountDownLatch
,然后让生产者线程await()
直到所有工作完成。 - 使用
CompletionService
是不合适的,因为尽管我的生产者线程需要阻塞(即通过调用take()
),但没有办法表明所有工作都已完成 ,导致生产者线程停止等待。
我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时递增它,并在处理工作项时递减它。 在所有N个任务的提交之后,我的生产者线程将需要等待一个锁,检查counter == 0
是否通知。 如果消费者线程已经递减计数器并且新值为0,则消费者线程将需要通知生产者。
有没有更好的方法来解决这个问题,或者java.util.concurrent
是否有合适的构造我应该使用而不是“滚动自己的”?
提前致谢。
java.util.concurrent.Phaser
看起来适合你。 计划在Java 7中发布,但最稳定的版本可以在jsr166的兴趣小组网站上找到。
移相器是一个美化的循环屏障。 您可以注册N个参与方,并在准备好等待特定阶段的预付款时。
一个关于它如何工作的简单示例:
final Phaser phaser = new Phaser(); public Runnable getRunnable(){ return new Runnable(){ public void run(){ ..do stuff... phaser.arriveAndDeregister(); } }; } public void doWork(){ phaser.register();//register self for(int i=0 ; i < N; i++){ phaser.register(); // register this task prior to execution executor.submit( getRunnable()); } phaser.arriveAndAwaitAdvance(); }
您当然可以使用受AtomicReference
保护的CountDownLatch
,这样您的任务就会被包装:
public class MyTask extends Runnable { private final Runnable r; public MyTask(Runnable r, AtomicReference l) { this.r = r; } public void run() { r.run(); while (l.get() == null) Thread.sleep(1000L); //handle Interrupted l.get().countDown(); } }
请注意 ,任务运行他们的工作然后旋转,直到设置倒计时(即知道任务的总数)。 一旦设置倒计时,它们就会将其计数并退出。 这些提交如下:
AtomicReference l = new AtomicReference (); executor.submit(new MyTask(r, l));
在创建/提交作品之后, 当您知道创建了多少任务时 :
latch.set(new CountDownLatch(nTasks)); latch.get().await();
我已经使用了ExecutorCompletionService这样的东西:
ExecutorCompletionService executor = ...; int count = 0; while (...) { executor.submit(new Processor()); count++; } //Now, pull the futures out of the queue: for (int i = 0; i < count; i++) { executor.take().get(); }
这涉及保留已提交的任务队列,因此如果您的列表任意长,您的方法可能会更好。
但是请确保使用AtomicInteger
进行协调,这样您就可以在一个线程中递增它,并在工作线程中减少它。
我假设您的生产者不需要知道队列何时为空,但需要知道最后一个任务何时完成。
我会向waitforWorkDone(producer)
添加waitforWorkDone(producer)
方法。 生产者可以添加其N个任务并调用wait方法。 如果工作队列不为空并且此刻没有任务正在执行,则wait方法会阻塞传入的线程。
如果任务已完成,则消费者会对waitfor锁上的notifyAll()
进行线程,队列为空,并且没有其他任务正在执行。
你所描述的与使用标准信号量非常相似,但使用了“向后”。
- 您的信号量以0许可开始
- 每个工作单元在完成时释放一个许可证
- 你等待获得N个许可证
此外,您可以灵活地获得M
独立的Java 8+方法,使用单程Phaser
为Stream
完成此操作。 Iterator
/ Iterable
变体完全相同。
public static int submitAndWait(Stream items, Consumer handleItem, Executor executor) { Phaser phaser = new Phaser(1); // 1 = register ourselves items.forEach(item -> { phaser.register(); // new task executor.execute(() -> { handleItem.accept(item); phaser.arrive(); // completed task }); }); phaser.arriveAndAwaitAdvance(); // block until all tasks are complete return phaser.getRegisteredParties()-1; // number of items } ... int recognised = submitAndWait(facesInPicture, this::detectFace, executor)
仅供参考。 这对于单个事件很好,但是,如果同时调用它,涉及ForkJoinPool
的解决方案将阻止父线程阻塞。