灵活的CountDownLatch?

我现在遇到了两次问题,即生产者线程生成N个工作项,将它们提交给ExecutorService ,然后需要等到所有N个项都被处理完毕。

注意事项

  • N事先不知道 。 如果是这样的话,我只需创建一个CountDownLatch ,然后让生产者线程await()直到所有工作完成。
  • 使用CompletionService是不合适的,因为尽管我的生产者线程需要阻塞(即通过调用take() ),但没有办法表明所有工作都已完成 ,导致生产者线程停止等待。

我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时递增它,并在处理工作项时递减它。 在所有N个任务的提交之后,我的生产者线程将需要等待一个锁,检查counter == 0是否通知。 如果消费者线程已经递减计数器并且新值为0,则消费者线程将需要通知生产者。

有没有更好的方法来解决这个问题,或者java.util.concurrent是否有合适的构造我应该使用而不是“滚动自己的”?

提前致谢。

java.util.concurrent.Phaser看起来适合你。 计划在Java 7中发布,但最稳定的版本可以在jsr166的兴趣小组网站上找到。

移相器是一个美化的循环屏障。 您可以注册N个参与方,并在准备好等待特定阶段的预付款时。

一个关于它如何工作的简单示例:

 final Phaser phaser = new Phaser(); public Runnable getRunnable(){ return new Runnable(){ public void run(){ ..do stuff... phaser.arriveAndDeregister(); } }; } public void doWork(){ phaser.register();//register self for(int i=0 ; i < N; i++){ phaser.register(); // register this task prior to execution executor.submit( getRunnable()); } phaser.arriveAndAwaitAdvance(); } 

您当然可以使用受AtomicReference保护的CountDownLatch ,这样您的任务就会被包装:

 public class MyTask extends Runnable { private final Runnable r; public MyTask(Runnable r, AtomicReference l) { this.r = r; } public void run() { r.run(); while (l.get() == null) Thread.sleep(1000L); //handle Interrupted l.get().countDown(); } } 

请注意 ,任务运行他们的工作然后旋转,直到设置倒计时(即知道任务的总数)。 一旦设置倒计时,它们就会将其计数并退出。 这些提交如下:

 AtomicReference l = new AtomicReference(); executor.submit(new MyTask(r, l)); 

创建/提交作品之后, 当您知道创建了多少任务时

 latch.set(new CountDownLatch(nTasks)); latch.get().await(); 

我已经使用了ExecutorCompletionService这样的东西:

 ExecutorCompletionService executor = ...; int count = 0; while (...) { executor.submit(new Processor()); count++; } //Now, pull the futures out of the queue: for (int i = 0; i < count; i++) { executor.take().get(); } 

这涉及保留已提交的任务队列,因此如果您的列表任意长,您的方法可能会更好。

但是请确保使用AtomicInteger进行协调,这样您就可以在一个线程中递增它,并在工作线程中减少它。

我假设您的生产者不需要知道队列何时为空,但需要知道最后一个任务何时完成。

我会向waitforWorkDone(producer)添加waitforWorkDone(producer)方法。 生产者可以添加其N个任务并调用wait方法。 如果工作队列不为空并且此刻没有任务正在执行,则wait方法会阻塞传入的线程。

如果任务已完成,则消费者会对waitfor锁上的notifyAll()进行线程,队列为空,并且没有其他任务正在执行。

你所描述的与使用标准信号量非常相似,但使用了“向后”。

  • 您的信号量以0许可开始
  • 每个工作单元在完成时释放一个许可证
  • 你等待获得N个许可证

此外,您可以灵活地获得M

独立的Java 8+方法,使用单程PhaserStream完成此操作。 Iterator / Iterable变体完全相同。

 public static  int submitAndWait(Stream items, Consumer handleItem, Executor executor) { Phaser phaser = new Phaser(1); // 1 = register ourselves items.forEach(item -> { phaser.register(); // new task executor.execute(() -> { handleItem.accept(item); phaser.arrive(); // completed task }); }); phaser.arriveAndAwaitAdvance(); // block until all tasks are complete return phaser.getRegisteredParties()-1; // number of items } ... int recognised = submitAndWait(facesInPicture, this::detectFace, executor) 

仅供参考。 这对于单个事件很好,但是,如果同时调用它,涉及ForkJoinPool的解决方案将阻止父线程阻塞。