生产者/消费者工作队列

我正在努力实现我的处理管道的最佳方法。

我的制作人将作品提供给BlockingQueue。 在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService。

while (!isStopping()) { String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); if (work == null) { break; } executorService.execute(new Worker(work)); // needs to block if no threads! } 

这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽工作队列并填充任务队列,随着任务的完成,队列会慢慢排空。

我意识到我可以在生产者端排队任务,但我真的不愿意这样做 – 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 这真的不是制片人的任何事情会发生在他们身上。 迫使生产者对Runnable或Callable进行排队会打破抽象,恕我直言。

但我确实希望共享工作队列代表当前的处理状态。 如果消费者没有跟上,我希望能够阻止生产者。

我喜欢使用Executors,但我觉得我正在与他们的设计作斗争。 我可以部分喝Kool-ade,还是我必须吞下它? 我是否在拒绝排队任务方面做错了? (我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕。)

建议?

我希望共享工作队列代表当前的处理状态。

尝试使用共享的BlockingQueue并使用一个工作线程池来从队列中取出工作项。

如果消费者没有跟上,我希望能够阻止生产者。

ArrayBlockingQueue和LinkedBlockingQueue都支持有界队列,这样它们就会在满时阻塞。 使用blocking put()方法可确保在队列已满时阻止生成器。

这是一个艰难的开始。 您可以调整工作线程数和队列大小:

 public class WorkerTest { private final BlockingQueue workQueue; private final ExecutorService service; public WorkerTest(int numWorkers, int workQueueSize) { workQueue = new LinkedBlockingQueue(workQueueSize); service = Executors.newFixedThreadPool(numWorkers); for (int i=0; i < numWorkers; i++) { service.submit(new Worker(workQueue)); } } public void produce(T item) { try { workQueue.put(item); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } private static class Worker implements Runnable { private final BlockingQueue workQueue; public Worker(BlockingQueue workQueue) { this.workQueue = workQueue; } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { T item = workQueue.take(); // Process item } catch (InterruptedException ex) { Thread.currentThread().interrupt(); break; } } } } } 

“找到一个可用的现有工作线程(如果存在),必要时创建一个,如果它们空闲则杀死它们。”

管理所有这些工人国家是不必要的,因为它是危险的。 我会创建一个不断在后台运行的监视器线程,其唯一的任务是填充队列并产生消费者……为什么不制作工作线程守护程序,以便它们一旦完成就会死掉? 如果将它们全部附加到一个ThreadGroup,则可以动态地重新调整池的大小…例如:

  **for(int i=0; i 

您可以让您的消费者直接执行Runnable::run而不是启动新线程。 将它与最大尺寸的阻塞队列结合起来,我认为你会得到你想要的。 您的使用者将成为根据队列中的工作项内联执行任务的工作者。 他们只会在处理项目时尽快将项目出列,因此当您的消费者停止消费时,您的生产者就会出现。