创建动态(增长/缩小)线程池

我需要在Java中实现一个线程池(java.util.concurrent),其空闲时线程数达到某个最小值,当作业提交到它上面的速度超过完成执行时,它会增长到上限(但绝不会更远) ,当完成所有作业并且不再提交作业时,缩小回到下限。

你会如何实现这样的东西? 我想这将是一个相当常见的使用场景,但显然java.util.concurrent.Executors工厂方法只能创建固定大小的池和池,当提交许多作业时,这些池和池会无限增长。 ThreadPoolExecutor类提供了corePoolSizemaximumPoolSize参数,但是它的文档似乎暗示了同时拥有多个corePoolSize线程的唯一方法是使用有界作业队列,在这种情况下,如果你已达到maximumPoolSize线程,你会得到拒绝工作,你必须自己处理? 我想出了这个:

 //pool creation ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(minSize)); ... //submitting jobs for (Runnable job : ...) { while (true) { try { pool.submit(job); System.out.println("Job " + job + ": submitted"); break; } catch (RejectedExecutionException e) { // maxSize jobs executing concurrently atm.; re-submit new job after short wait System.out.println("Job " + job + ": rejected..."); try { Thread.sleep(300); } catch (InterruptedException e1) { } } } } 

我忽略了什么吗? 有一个更好的方法吗? 此外,根据一个人的要求,上述代码至少(我认为) (total number of jobs) - maxSize作业完成后才会完成,这可能会有问题。 因此,如果您希望能够将任意数量的作业提交到池中并立即进行而无需等待其中任何一个完成,我看不出如何在没有专门的“作业总结”线程的情况下执行此操作保存所有提交的作业所需的无限队列。 AFAICS,如果你为ThreadPoolExecutor本身使用一个无界的队列,它的线程数将永远不会超过corePoolSize。

可能对您有用的一个技巧是分配一个RejectedExecutionHandler ,它使用相同的线程将作业提交到阻塞队列。 这将阻止当前线程并消除对某种循环的需要。

在这里看到我的答案:

如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

这是从该答案中复制的拒绝处理程序。

 final BlockingQueue queue = new ArrayBlockingQueue(200); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue); // by default (unfortunately) the ThreadPoolExecutor will call the rejected // handler when you submit the 201st job, to have it block you do: threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // this will block if the queue is full executor.getQueue().put(r); } }); 

然后,只要您意识到在核心线程上方创建任何线程之前首先使用的有界阻塞队列填满,您就应该能够使用核心/最大线程数。 因此,如果您有10个核心线程并且您希望第11个作业启动第11个线程,那么您将需要一个大小为0的阻塞队列(可能是SynchronousQueue )。 我觉得这是其他伟大的ExecutorService类的真正限制。

当增长和缩小与线程一起出现时,只有一个名字出现在我的脑海中: 来自java.util.concurrent包的CachedThreadPool

 ExecutorService executor = Executors.newCachedThreadPool(); 

CachedThreadPool()可以重用该线程 ,并在需要时创建新线程 。 是的,如果一个线程闲置60秒,CachedThreadPool将终止它。 所以这很轻巧 – 用你的话来说是成长和缩小!

maximumPoolSize设置为Integer.MAX_VALUE 。 如果你有超过20亿个线程……那么,祝你好运。

无论如何, ThreadPoolExecutor的Javadoc声明:

通过将maximumPoolSize设置为基本无限制的值(例如Integer.MAX_VALUE),可以允许池容纳任意数量的并发任务。 最典型的情况是,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)动态更改。

使用类似于LinkedBlockingQueue的类似无界任务队列,这应该具有任意大容量。