在具有等待任务时动态调整java.util.concurrent.ThreadPoolExecutor的大小

我正在使用java.util.concurrent.ThreadPoolExecutor来并行处理多个项目。 尽管线程本身工作正常,但由于线程中发生的操作,我们有时会遇到其他资源限制,这使我们想要调低池中线程的数量。

我想知道在线程实际工作时是否有办法拨打线程数。 我知道您可以调用setMaximumPoolSize()和/或setCorePoolSize() ,但这些只会在线程空闲后调整池的大小,但是在队列中没有任务等待之前它们不会变为空闲。

据我所知,这是一个不太干净的方式是不可能的。

您可以实现beforeExecute方法来检查一些布尔值并强制线程暂时停止。 请记住,它们将包含一个在重新启用之前不会执行的任务。

或者,您可以实现afterExecute以在饱和时抛出RuntimeException。 这将有效地导致Thread死亡,并且由于Executor将高于最大值,因此不会创建新的。

我不建议你这样做。 相反,尝试找到一些其他方法来控制导致问题的任务的并发执行。 可能是在一个单独的线程池中执行它们,并且工作人员数量有限。

你绝对可以。 调用setCorePoolSize(int)将更改池的核心大小。 对此方法的调用是线程安全的,并覆盖提供给ThreadPoolExecutor的构造函数的设置。 如果要修剪池大小,其余线程将在其当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭)。 如果要增加池大小,将尽快分配新线程。 分配新线程的时间范围没有记录 – 但在实现中,每次调用execute方法时都会执行新线程的分配。

要将其与运行时可调参数作业服务器场配对,您可以将此属性(通过包装器或使用动态MBean导出器)公开为读写JMX属性,以创建一个相当不错的,即时可调的批处理器。

要在运行时强制减小池大小(这是您的请求),您必须beforeExecute(Thread,Runnable) ThreadPoolExecutor并对beforeExecute(Thread,Runnable)方法添加中断。 中断线程不是一个充分的中断,因为它只与等待状态交互,并且在处理ThreadPoolExecutor任务线程期间不会进入可中断状态。

我最近遇到了同样的问题,试图在执行所有提交的任务之前强制终止线程池。 为了实现这一点,我通过在将线程的UncaughtExceptionHandler替换为需要我的特定exception并将其丢弃的线程的UncaughtExceptionHandler之后抛出运行时exception来中断线程。

 /** * A runtime exception used to prematurely terminate threads in this pool. */ static class ShutdownException extends RuntimeException { ShutdownException (String message) { super(message); } } /** * This uncaught exception handler is used only as threads are entered into * their shutdown state. */ static class ShutdownHandler implements UncaughtExceptionHandler { private UncaughtExceptionHandler handler; /** * Create a new shutdown handler. * * @param handler The original handler to deligate non-shutdown * exceptions to. */ ShutdownHandler (UncaughtExceptionHandler handler) { this.handler = handler; } /** * Quietly ignore {@link ShutdownException}. * 

* Do nothing if this is a ShutdownException, this is just to prevent * logging an uncaught exception which is expected. Otherwise forward * it to the thread group handler (which may hand it off to the default * uncaught exception handler). *

*/ public void uncaughtException (Thread thread, Throwable throwable) { if (!(throwable instanceof ShutdownException)) { /* Use the original exception handler if one is available, * otherwise use the group exception handler. */ if (handler != null) { handler.uncaughtException(thread, throwable); } } } } /** * Configure the given job as a spring bean. * *

Given a runnable task, configure it as a prototype spring bean, * injecting any necessary dependencices.

* * @param thread The thread the task will be executed in. * @param job The job to configure. * * @throws IllegalStateException if any error occurs. */ protected void beforeExecute (final Thread thread, final Runnable job) { /* If we're in shutdown, it's because spring is in singleton shutdown * mode. This means we must not attempt to configure the bean, but * rather we must exit immediately (prematurely, even). */ if (!this.isShutdown()) { if (factory == null) { throw new IllegalStateException( "This class must be instantiated by spring" ); } factory.configureBean(job, job.getClass().getName()); } else { /* If we are in shutdown mode, replace the job on the queue so the * next process will see it and it won't get dropped. Further, * interrupt this thread so it will no longer process jobs. This * deviates from the existing behavior of shutdown(). */ workQueue.add(job); thread.setUncaughtExceptionHandler( new ShutdownHandler(thread.getUncaughtExceptionHandler()) ); /* Throwing a runtime exception is the only way to prematurely * cause a worker thread from the TheadPoolExecutor to exit. */ throw new ShutdownException("Terminating thread"); } }

在您的情况下,您可能想要创建一个没有许可证的信号量(仅用作线程安全计数器),并且在关闭线程时向其释放许多许可,这些许可对应于先前核心池大小的增量和新池大小(要求您覆盖setCorePoolSize(int)方法)。 这将允许您在当前任务完成后终止线程。

 private Semaphore terminations = new Semaphore(0); protected void beforeExecute (final Thread thread, final Runnable job) { if (terminations.tryAcquire()) { /* Replace this item in the queue so it may be executed by another * thread */ queue.add(job); thread.setUncaughtExceptionHandler( new ShutdownHandler(thread.getUncaughtExceptionHandler()) ); /* Throwing a runtime exception is the only way to prematurely * cause a worker thread from the TheadPoolExecutor to exit. */ throw new ShutdownException("Terminating thread"); } } public void setCorePoolSize (final int size) { int delta = getActiveCount() - size; super.setCorePoolSize(size); if (delta > 0) { terminations.release(delta); } } 

这应该中断n个线程f(n)= active – request 。 如果有任何问题, ThreadPoolExecutor的分配策略相当持久。 它使用finally块来保持过早终止,这可以保证执行。 因此,即使您终止太multithreading,它们也会重新填充。

解决方案是耗尽ThreadPoolExecutor队列,根据需要设置ThreadPoolExecutor大小,然后在其他人结束时逐个添加回线程。 在ThreadPoolExecutor类中排空队列的方法是私有的,因此您必须自己创建它。 这是代码:

 /** * Drains the task queue into a new list. Used by shutdownNow. * Call only while holding main lock. */ public static List drainQueue() { List taskList = new ArrayList(); BlockingQueue workQueue = executor.getQueue(); workQueue.drainTo(taskList); /* * If the queue is a DelayQueue or any other kind of queue * for which poll or drainTo may fail to remove some elements, * we need to manually traverse and remove remaining tasks. * To guarantee atomicity wrt other threads using this queue, * we need to create a new iterator for each element removed. */ while (!workQueue.isEmpty()) { Iterator it = workQueue.iterator(); try { if (it.hasNext()) { Runnable r = it.next(); if (workQueue.remove(r)) taskList.add(r); } } catch (ConcurrentModificationException ignore) { } } return taskList; } 

在调用此方法之前,您需要获取然后释放主锁。 为此,您需要使用javareflection,因为字段“mainLock”是私有的。 再次,这是代码:

 private Field getMainLock() throws NoSuchFieldException { Field mainLock = executor.getClass().getDeclaredField("mainLock"); mainLock.setAccessible(true); return mainLock; } 

“执行者”是你的ThreadPoolExecutor。

现在你需要锁定/解锁方法:

 public void lock() { try { Field mainLock = getMainLock(); Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null); lock.invoke(mainLock.get(executor), (Object[])null); } catch { ... } } public void unlock() { try { Field mainLock = getMainLock(); mainLock.setAccessible(true); Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null); lock.invoke(mainLock.get(executor), (Object[])null); } catch { ... } } 

最后,您可以编写“setThreadsNumber”方法,它可以增加和减少ThreadPoolExecutor大小:

 public void setThreadsNumber(int intValue) { boolean increasing = intValue > executor.getPoolSize(); executor.setCorePoolSize(intValue); executor.setMaximumPoolSize(intValue); if(increasing){ if(drainedQueue != null && (drainedQueue.size() > 0)){ executor.submit(drainedQueue.remove(0)); } } else { if(drainedQueue == null){ lock(); drainedQueue = drainQueue(); unlock(); } } } 

注意:很明显,如果执行N个并行线程并将此数字更改为N-1,则所有N个线程将继续运行。 当第一个线程结束时,不会执行新线程。 从现在开始,并行线程的数量将是您选择的数量。

我阅读了setMaximumPoolSize()和setCorePoolSize()的文档,看起来它们可以产生你需要的行为。

– 编辑 –

我的结论是错误的:请参阅下面的讨论了解详情……

我也需要相同的解决方案,似乎在JDK8中setCorePoolSize()和setMaximumPoolSize()确实产生了所需的结果。 我做了一个测试用例,我向池中提交了4个任务,并且它们执行得很顺利,我在运行时缩小了池的大小,然后提交了另一个我想要寂寞的可运行的东西。 然后我将池恢复到原始大小。 这是测试源https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

它产生以下输出(线程“50”是应该独立执行的那个)

 run: test thread 2 enter test thread 1 enter test thread 3 enter test thread 4 enter test thread 1 exit test thread 2 exit test thread 3 exit test thread 4 exit test thread 50 enter test thread 50 exit test thread 1 enter test thread 2 enter test thread 3 enter test thread 4 enter test thread 1 exit test thread 2 exit test thread 3 exit test thread 4 exit