ThreadPoolExecutor与ArrayBlockingQueue

我开始从Java Doc中读取更多关于ThreadPoolExecutor的内容,因为我正在我的一个项目中使用它。 那么任何人都可以解释一下这条线实际意味着什么吗? – 我知道每个参数代表什么,但我想从这里的一些专家那里以更一般/非人的方式理解它。

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10, true), new ThreadPoolExecutor.CallerRunsPolicy()); 

更新: –问题陈述是: –

每个线程使用介于1和1000之间的唯一ID,程序必须运行60分钟或更长时间,因此在60分钟内,所有ID都可能完成,因此我需要再次重用这些ID。 所以这是我用以上执行器编写的下面的程序。

 class IdPool { private final LinkedList availableExistingIds = new LinkedList(); public IdPool() { for (int i = 1; i <= 1000; i++) { availableExistingIds.add(i); } } public synchronized Integer getExistingId() { return availableExistingIds.removeFirst(); } public synchronized void releaseExistingId(Integer id) { availableExistingIds.add(id); } } class ThreadNewTask implements Runnable { private IdPool idPool; public ThreadNewTask(IdPool idPool) { this.idPool = idPool; } public void run() { Integer id = idPool.getExistingId(); someMethod(id); idPool.releaseExistingId(id); } // This method needs to be synchronized or not? private synchronized void someMethod(Integer id) { System.out.println("Task: " +id); // and do other calcuations whatever you need to do in your program } } public class TestingPool { public static void main(String[] args) throws InterruptedException { int size = 10; int durationOfRun = 60; IdPool idPool = new IdPool(); // create thread pool with given size ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(size), new ThreadPoolExecutor.CallerRunsPolicy()); // queue some tasks long startTime = System.currentTimeMillis(); long endTime = startTime + (durationOfRun * 60 * 1000L); // Running it for 60 minutes while(System.currentTimeMillis() <= endTime) { service.submit(new ThreadNewTask(idPool)); } // wait for termination service.shutdown(); service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } } 

我的问题是: – 就考虑性能而言,此代码是正确的吗? 还有什么我可以在这里使它更准确? 任何帮助将不胜感激。

[首先,我道歉,这是对先前答案的回应,但我想要格式化]。

除了在现实中,您不要阻止将项目提交给具有完整队列的ThreadPoolExecutor。 原因是ThreadPoolExecutor调用BlockingQueue.offer(T item)方法,根据定义,该方法是一种非阻塞方法。 它要么添加项目并返回true,要么不添加(填满时)并返回false。 然后,ThreadPoolExecutor调用已注册的RejectedExecutionHandler来处理这种情况。

来自javadoc:

将来某个时候执行给定的任务。 任务可以在新线程或现有池化线程中执行。 如果无法提交执行任务,或者因为此执行程序已关闭或已达到其容量,则该任务由当前的RejectedExecutionHandler处理。

默认情况下,使用ThreadPoolExecutor.AbortPolicy(),它从ThreadPoolExecutor的“submit”或“execute”方法抛出RejectedExecutionException。

 try { executorService.execute(new Runnable() { ... }); } catch (RejectedExecutionException e) { // the queue is full, and you're using the AbortPolicy as the // RejectedExecutionHandler } 

但是,您可以使用其他处理程序执行不同的操作,例如忽略错误(DiscardPolicy),或在调用“execute”或“submit”方法(CallerRunsPolicy)的线程中运行它。 此示例允许在队列已满时调用“submit”或“execute”方法运行所请求任务的任何线程。 (这意味着在任何给定的时间,你可以在池本身的顶部运行另外一件事):

 ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy()); 

如果你想阻塞并等待,你可以实现你自己的RejectedExecutionHandler,它会阻塞,直到队列中有一个可用的插槽(这是一个粗略的估计,我没有编译或运行它,但你应该明白):

 public class BlockUntilAvailableSlot implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (e.isTerminated() || e.isShutdown()) { return; } boolean submitted = false; while (! submitted) { if (Thread.currentThread().isInterrupted()) { // be a good citizen and do something nice if we were interrupted // anywhere other than during the sleep method. } try { e.execute(r); submitted = true; } catch (RejectedExceptionException e) { try { // Sleep for a little bit, and try again. Thread.sleep(100L); } catch (InterruptedException e) { ; // do you care if someone called Thread.interrupt? // if so, do something nice here, and maybe just silently return. } } } } } 

它正在创建一个ExecutorService来处理线程池的执行。 在这种情况下,池中的初始和最大线程数都是10。 当池中的线程空闲1秒(1000毫秒)时它将终止它(空闲计时器),但是因为线程的最大和核心数量是相同的,所以这永远不会发生(它总是保持10个线程并且将会从不运行超过10个线程)。

它使用ArrayBlockingQueue来管理10个插槽的执行请求,因此当队列已满(10个线程入队后)时,它将阻止调用者。

如果线程被拒绝(在这种情况下,由于服务关闭,由于线程将排队,或者如果队列已满,则在排队线程时将被阻止),那么提供的Runnable将在调用者的线程上执行。

考虑信号量。 这些是出于同样的目的。 请使用信号量检查下面的代码。 不确定这是不是你想要的。 但如果没有更多的许可证,这将阻止。 对你来说ID也很重要吗?

 import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; class ThreadNewTask implements Runnable { private Semaphore idPool; public ThreadNewTask(Semaphore idPool) { this.idPool = idPool; } public void run() { // Integer id = idPool.getExistingId(); try { idPool.acquire(); someMethod(0); } catch (InterruptedException e) { e.printStackTrace(); } finally { idPool.release(); } // idPool.releaseExistingId(id); } // This method needs to be synchronized or not? private void someMethod(Integer id) { System.out.println("Task: " + id); // and do other calcuations whatever you need to do in your program } } public class TestingPool { public static void main(String[] args) throws InterruptedException { int size = 10; int durationOfRun = 60; Semaphore idPool = new Semaphore(100); // IdPool idPool = new IdPool(); // create thread pool with given size ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(size), new ThreadPoolExecutor.CallerRunsPolicy()); // queue some tasks long startTime = System.currentTimeMillis(); long endTime = startTime + (durationOfRun * 60 * 1000L); // Running it for 60 minutes while (System.currentTimeMillis() <= endTime) { service.submit(new ThreadNewTask(idPool)); } // wait for termination service.shutdown(); service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } }