ExecutorService,避免任务队列过满的标准方法

我正在使用ExecutorService来简化并发multithreading程序。 请使用以下代码:

 while(xxx) { ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); ... Future ... = exService.submit(..); ... } 

在我的情况下,问题是如果所有NUMBER_THREADS都被占用,则submit()不会阻塞。 结果是任务队列被许多任务淹没。 这样做的结果是,使用ExecutorService.shutdown()关闭执行服务需要很长时间( ExecutorService.isTerminated()将长时间处于假状态)。 原因是任务队列仍然很满。

现在我的解决方法是使用信号量来禁止在ExecutorService的任务队列中有许多条目:

 ... Semaphore semaphore=new Semaphore(NUMBER_THREADS); while(xxx) { ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); ... semaphore.aquire(); // internally the task calls a finish callback, which invokes semaphore.release() // -> now another task is added to queue Future ... = exService.submit(..); ... } 

我相信有更好的封装解决方案?

诀窍是使用固定的队列大小和:

 new ThreadPoolExecutor.CallerRunsPolicy() 

我还建议使用Guava的ListeningExecutorService 。 以下是消费者/生产者队列的示例。

 private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 

更好的是你可能想要像RabbitMQ或ActiveMQ这样的MQ,因为他们有QoS技术。

您可以调用ThreadPoolExecutor.getQueue().size()来查找等待队列的大小。 如果队列太长,您可以采取行动。 如果队列太长而不能减慢生产者的速度(如果合适的话),我建议在当前线程中运行任务。

你最好自己创建ThreadPoolExecutor (这是Executors.newXXX()无论如何都要做的)。

在构造函数中,您可以传入BlockingQueue以供Executor用作其任务队列。 如果传入一个大小受限的BlockingQueue(如LinkedBlockingQueue ),它应该达到你想要的效果。

 ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(workQueueSize)); 

一个真正的阻塞ThreadPoolExecutor已经列入许多人的心愿单,甚至还有一个JDC错误。 我遇到了同样的问题,并遇到了这个问题: http : //today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

它是BlockingThreadPoolExecutor的一个实现,使用RejectionPolicy实现,它使用offer将任务添加到队列,等待队列有空间。 看起来不错。

你可以添加另一个有限大小的bloquing队列来控制executorService中内部队列的大小,有些人认为信号量很容易。 在执行者之前,你把()和任务交给()。 take()必须在任务代码中