如何使用ThreadPoolExecutor和自定义任务实现PriorityBlockingQueue

我经常搜索,但找不到解决问题的方法。

我有自己的类BaseTask ,它使用ThreadPoolExecutor来处理任务。
如果我不想要优先级(即使用LinkedBlockingQueue ),这可以正常工作,但是当我尝试使用PriorityBlockingQueue我得到ClassCastException因为ThreadPoolExecutor将我的Tasks包装到FutureTask对象中。
这显然是可以的,因为FutureTask没有实现Comparable ,但我将如何继续解决优先级问题?
我已经读过你可以在ThreadPoolExecutor覆盖newTaskFor ,但我似乎根本找不到这个方法……?

我们欢迎所有的建议!

一些代码可以帮助:

在我的BaseTask课程中,我有

 private static final BlockingQueue sWorkQueue = new PriorityBlockingQueue(); private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor( 1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory); private final BaseFutureTask mFuture; public BaseTask(int priority) { mFuture = new BaseFutureTask(mWorker, priority); } public final BaseTask execute(Params... params) { /* Some unimportant code here */ sExecutor.execute(mFuture); } 

BaseFutureTask类中

 @Override public int compareTo(BaseFutureTask another) { long diff = this.priority - another.priority; return Long.signum(diff); } 

BaseThreadPoolExecutor类中,我重写了3个submit方法……
调用此类中的构造函数,但不调用任何submit方法

 public class ExecutorPriority { public static void main(String[] args) { PriorityBlockingQueue pq = new PriorityBlockingQueue(20, new ComparePriority()); Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq); exe.execute(new RunWithPriority(2) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); exe.execute(new RunWithPriority(10) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); } private static class ComparePriority implements Comparator { @Override public int compare(T o1, T o2) { return o1.getPriority().compareTo(o2.getPriority()); } } 

}

你可以猜测RunWithPriority是一个抽象类,它是Runnable并且有一个Integer优先级字段

您可以使用这些帮助程序类:

 public class PriorityFuture implements RunnableFuture { private RunnableFuture src; private int priority; public PriorityFuture(RunnableFuture other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(); } public void run() { src.run(); } public static Comparator COMP = new Comparator() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture) o1).getPriority(); int p2 = ((PriorityFuture) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; } 

 public interface PriorityCallable extends Callable { int getPriority(); } 

这个帮手方法:

 public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(10, PriorityFuture.COMP)) { protected  RunnableFuture newTaskFor(Callable callable) { RunnableFuture newTaskFor = super.newTaskFor(callable); return new PriorityFuture(newTaskFor, ((PriorityCallable) callable).getPriority()); } }; } 

然后像这样使用它:

 class LenthyJob implements PriorityCallable { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } } 

我的解决方案

 public class XThreadPoolExecutor extends ThreadPoolExecutor { public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } protected  RunnableFuture newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(runnable, value); } protected  RunnableFuture newTaskFor(Callable callable) { return new ComparableFutureTask<>(callable); } protected class ComparableFutureTask extends FutureTask implements Comparable> { private Object object; public ComparableFutureTask(Callable callable) { super(callable); object = callable; } public ComparableFutureTask(Runnable runnable, V result) { super(runnable, result); object = runnable; } @Override @SuppressWarnings("unchecked") public int compareTo(ComparableFutureTask o) { if (this == o) { return 0; } if (o == null) { return -1; // high priority } if (object != null && o.object != null) { if (object.getClass().equals(o.object.getClass())) { if (object instanceof Comparable) { return ((Comparable) object).compareTo(o.object); } } } return 0; } } } 

我将尝试用function齐全的代码解释这个问题。 但在深入研究代码之前,我想解释一下PriorityBlockingQueue

PriorityBlockingQueue :PriorityBlockingQueue是BlockingQueue的实现。 它接受任务及其优先级,并首先提交具有最高优先级的任务。 如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定首先执行哪个任务。

现在让我们直接进入代码。

驱动程序类 :此类创建一个执行程序,它接受任务并稍后提交它们以供执行。 这里我们创建两个任务,一个具有LOW优先级,另一个具有HIGH优先级。 在这里,我们告诉执行程序运行MAX的1个线程并使用PriorityBlockingQueue。

  public static void main(String[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); } 

MyTask类 :MyTask实现Runnable并接受优先级作为构造函数中的参数。 当此任务运行时,它会打印一条消息,然后让线程进入hibernate状态1秒钟。

  public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public String getName() { return name; } private String name; public MyTask(Priority priority,String name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 

MyFutureTask类 :由于我们使用PriorityBlocingQueue来保存我们的任务,我们的任务必须包含在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较2个不同任务的优先级,并提交具有最高执行优先级的任务。

  public class MyFutureTask extends FutureTask implements Comparable { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } } 

优先级 :自解释优先级。

 public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } } 

现在,当我们运行此示例时,我们得到以下输出

 The following Runnable is getting executed High The following Runnable is getting executed Low 

即使我们先提交LOW优先级,但稍后提交HIGH优先级任务,但由于我们使用PriorityBlockingQueue,优先级较高的任务将首先执行。

看起来他们离开了阿帕奇的和谐。 大约一年前有一个svn提交日志 ,修复了缺少newTaskFor 。 您可以在扩展的ThreadPoolExecutor覆盖submit函数,以创建可Comparable的扩展FutureTask 。 它们不是很长 。

回答你的问题: newTaskFor()方法可以在ThreadPoolExecutor的超类AbstractExecutorService 。 但是,您可以在ThreadPoolExecutor简单地覆盖它。