在ThreadPoolExecutor中测试PriorityBlockingQueue

我在这个例子中实现了我的ThreadPoolExecutor和PriorityBlockingQueue: https : //stackoverflow.com/a/12722648/2206775

并写了一个测试:

PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(16); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("1"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 1); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 3); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("2"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 2); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("5"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 5); executorService.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); Thread.sleep(1000); System.out.println("4"); } catch (InterruptedException e) { e.printStackTrace(); } } }, 4); executorService.shutdown(); try { executorService.awaitTermination(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } 

但最后,我得不到1 2 3 4 5,我得到这些数字的随机顺序。 测试有问题,还是其他什么问题? 如果首先,如何正确测试?

如果池完全忙并且您提交了几个新任务,则仅考虑优先级。 如果仅使用一个线程定义池,则应获得预期的输出。 在您的示例中,所有任务同时执行,哪个任务首先完成有点随机。

顺便说一下,链接的实现存在问题,如果您的队列已满并且您提交了新任务,则会抛出exception。

请参阅下面一个您正在尝试实现的工作示例(我以简单的方式覆盖newTaskFor ,只是为了使其工作 – 您可能希望改进该部分)。

它打印: 1 2 3 4 5

 public class Test { public static void main(String[] args) { PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1); executorService.submit(getRunnable("1"), 1); executorService.submit(getRunnable("3"), 3); executorService.submit(getRunnable("2"), 2); executorService.submit(getRunnable("5"), 5); executorService.submit(getRunnable("4"), 4); executorService.shutdown(); try { executorService.awaitTermination(30, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } public static Runnable getRunnable(final String id) { return new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(id); } catch (InterruptedException e) { e.printStackTrace(); } } }; } static class PriorityExecutor extends ThreadPoolExecutor { public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } //Utitlity method to create thread pool easily public static ExecutorService newFixedThreadPool(int nThreads) { return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue()); } //Submit with New comparable task public Future submit(Runnable task, int priority) { return super.submit(new ComparableFutureTask(task, null, priority)); } //execute with New comparable task public void execute(Runnable command, int priority) { super.execute(new ComparableFutureTask(command, null, priority)); } @Override protected  RunnableFuture newTaskFor(Callable callable) { return (RunnableFuture) callable; } @Override protected  RunnableFuture newTaskFor(Runnable runnable, T value) { return (RunnableFuture) runnable; } } static class ComparableFutureTask extends FutureTask implements Comparable> { volatile int priority = 0; public ComparableFutureTask(Runnable runnable, T result, int priority) { super(runnable, result); this.priority = priority; } public ComparableFutureTask(Callable callable, int priority) { super(callable); this.priority = priority; } @Override public int compareTo(ComparableFutureTask o) { return Integer.valueOf(priority).compareTo(o.priority); } } } 

您有16个线程,只有5个任务,这意味着所有这些任务都在同时执行,优先级实际上是无关紧要的。

优先级仅在有任务等待执行时才有意义。

为了说明这一点,如果您将示例设置为仅使用1个线程,您将获得预期的输出。