优雅地将队列长度指示符实现到ExecutorServices

为什么,为什么java.util.concurrent没有为其ExecutorService提供队列长度指示器? 最近我发现自己做了这样的事情:

 ExecutorService queue = Executors.newSingleThreadExecutor(); AtomicInteger queueLength = new AtomicInteger(); ... public void addTaskToQueue(Runnable runnable) { if (queueLength.get() < MAX_QUEUE_LENGTH) { queueLength.incrementAndGet(); // Increment queue when submitting task. queue.submit(new Runnable() { public void run() { runnable.run(); queueLength.decrementAndGet(); // Decrement queue when task done. } }); } else { // Trigger error: too long queue } } 

哪个工作正常,但是……我认为这应该作为ExecutorService的一部分实现。 这是一个愚蠢的错误,容易携带一个与实际队列分开的计数器,计数器应该指示它的长度(让我想起C数组)。 但是, ExecutorService是通过静态工厂方法获得的,因此无法简单地扩展优秀的单线程执行器并添加队列计数器。 所以我该怎么做:

  1. 重新发明已经在JDK中实现的东西?
  2. 其他聪明的解决方案?

还有一种更直接的方式:

 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor(); // add jobs // ... int size = executor.getQueue().size(); 

虽然您可能考虑不使用Executor的方便创建方法,而是直接创建执行程序以摆脱强制转换,因此确保执行程序将始终实际上是ThreadPoolExecutor ,即使Executors.newSingleThreadExecutor的实现会改变有一天。

 ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue() ); 

这是从JDK 1.6中的Executors.newSingleThreadExecutor直接复制的。 传递给构造函数的LinkedBlockingQueue实际上是您将从getQueue返回的对象。

虽然您可以直接检查队列大小。 处理过长的队列的另一种方法是使内部队列受限制。

 public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true)); } 

当超出限制时,这将导致RejectedExecutionExceptions(请参阅此 )。

如果要避免exception,可以劫持调用线程来执行该函数。 请参阅此SO问题以获得解决方案。

参考

  • 的ThreadPoolExecutor