具有有界队列的Java线程池
我正在使用java.util.concurrent
的Executors
类来创建一个固定的线程池,用于运行Web服务器的请求处理程序:
static ExecutorService newFixedThreadPool(int nThreads)
而描述是:
创建一个线程池,该线程池重用一组在共享无界队列中运行的固定线程。
但是,我正在寻找线程池实现,它将执行完全相同的操作,除了有界队列。 有没有这样的实施? 或者我是否需要为固定线程池实现自己的包装器?
你想要做的是新建自己的ExecutorService,可能使用ThreadPoolExecutor 。 ThreadPoolExecutor有一个构造函数,它接受一个BlockingQueue并获得一个有界的队列,例如正确构造用于边界的ArrayBlockingQueue 。 您还可以包含RejectedExecutionHandler ,以确定队列已满时要执行的操作,或者挂起对阻塞队列的引用并使用offer方法。
这是一个小例子:
BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque ( 100); ExecutorService executorService = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, linkedBlockingDeque, new ThreadPoolExecutor.CallerRunsPolicy());
创建一个ThreadPoolexecutor并在其中传递合适的BlockingQueue实现。 例如,您可以在ThreadPoolExecutor构造函数中传入ArrayBlockingQueue以获得所需的效果。
当您创建ThreadPoolExecutor时,您可以为其提供有界的BlockingQueue和RejectedExecutionHandler,以便您可以控制达到限制时发生的情况。 默认行为是抛出RejectedExecutionException。
您还可以定义自己的线程工厂来控制线程名称并使它们成为守护程序线程。
我用Semaphore解决了这个问题,我用它来限制提交给ExecutorService
任务。
例如:
int threadCount = 10; ExecutorService producerPool = Executors.newSingleThreadedExecutor(); ExecutorService consumerPool = Executors.newFixedThreadPool(threadCount); // set the permit count greater than thread count so that we // build up a limited buffer of waiting consumers Semaphore semaphore = new Semaphore(threadCount * 100); Runnable producer = () -> { for (int i = 0; i < 1000000; ++i) { semaphore.acquire(); Runnable consumer = () -> { try { doSomeWork(i); } finally { semaphore.release(); } }; consumerPool.submit(consumer); } } Future future = producerPool.submit(producer); // all consumers added to the pool when this returns future.get(); producerPool.shutdown(); producerPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); consumerPool.shutdown(); // all consumers finished when this returns consumerPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);