如何使用ThreadPoolExecutor和自定义任务实现PriorityBlockingQueue
我经常搜索,但找不到解决问题的方法。
我有自己的类BaseTask
,它使用ThreadPoolExecutor
来处理任务。
如果我不想要优先级(即使用LinkedBlockingQueue
),这可以正常工作,但是当我尝试使用PriorityBlockingQueue
我得到ClassCastException
因为ThreadPoolExecutor
将我的Tasks包装到FutureTask
对象中。
这显然是可以的,因为FutureTask
没有实现Comparable
,但我将如何继续解决优先级问题?
我已经读过你可以在ThreadPoolExecutor
覆盖newTaskFor
,但我似乎根本找不到这个方法……?
我们欢迎所有的建议!
一些代码可以帮助:
在我的BaseTask
课程中,我有
private static final BlockingQueue sWorkQueue = new PriorityBlockingQueue(); private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor( 1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory); private final BaseFutureTask mFuture; public BaseTask(int priority) { mFuture = new BaseFutureTask(mWorker, priority); } public final BaseTask execute(Params... params) { /* Some unimportant code here */ sExecutor.execute(mFuture); }
在BaseFutureTask
类中
@Override public int compareTo(BaseFutureTask another) { long diff = this.priority - another.priority; return Long.signum(diff); }
在BaseThreadPoolExecutor
类中,我重写了3个submit
方法……
调用此类中的构造函数,但不调用任何submit
方法
public class ExecutorPriority { public static void main(String[] args) { PriorityBlockingQueue pq = new PriorityBlockingQueue (20, new ComparePriority()); Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq); exe.execute(new RunWithPriority(2) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); exe.execute(new RunWithPriority(10) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); } private static class ComparePriority implements Comparator { @Override public int compare(T o1, T o2) { return o1.getPriority().compareTo(o2.getPriority()); } }
}
你可以猜测RunWithPriority是一个抽象类,它是Runnable并且有一个Integer优先级字段
您可以使用这些帮助程序类:
public class PriorityFuture implements RunnableFuture { private RunnableFuture src; private int priority; public PriorityFuture(RunnableFuture other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(); } public void run() { src.run(); } public static Comparator COMP = new Comparator () { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture>) o1).getPriority(); int p2 = ((PriorityFuture>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; }
和
public interface PriorityCallable extends Callable { int getPriority(); }
和这个帮手方法:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(10, PriorityFuture.COMP)) { protected RunnableFuture newTaskFor(Callable callable) { RunnableFuture newTaskFor = super.newTaskFor(callable); return new PriorityFuture (newTaskFor, ((PriorityCallable ) callable).getPriority()); } }; }
然后像这样使用它:
class LenthyJob implements PriorityCallable { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } }
我的解决方案
public class XThreadPoolExecutor extends ThreadPoolExecutor { public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new ComparableFutureTask<>(callable); } protected class ComparableFutureTask extends FutureTask implements Comparable> { private Object object; public ComparableFutureTask(Callable callable) { super(callable); object = callable; } public ComparableFutureTask(Runnable runnable, V result) { super(runnable, result); object = runnable; } @Override @SuppressWarnings("unchecked") public int compareTo(ComparableFutureTask o) { if (this == o) { return 0; } if (o == null) { return -1; // high priority } if (object != null && o.object != null) { if (object.getClass().equals(o.object.getClass())) { if (object instanceof Comparable) { return ((Comparable) object).compareTo(o.object); } } } return 0; } } }
我将尝试用function齐全的代码解释这个问题。 但在深入研究代码之前,我想解释一下PriorityBlockingQueue
PriorityBlockingQueue :PriorityBlockingQueue是BlockingQueue的实现。 它接受任务及其优先级,并首先提交具有最高优先级的任务。 如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定首先执行哪个任务。
现在让我们直接进入代码。
驱动程序类 :此类创建一个执行程序,它接受任务并稍后提交它们以供执行。 这里我们创建两个任务,一个具有LOW优先级,另一个具有HIGH优先级。 在这里,我们告诉执行程序运行MAX的1个线程并使用PriorityBlockingQueue。
public static void main(String[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); }
MyTask类 :MyTask实现Runnable并接受优先级作为构造函数中的参数。 当此任务运行时,它会打印一条消息,然后让线程进入hibernate状态1秒钟。
public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public String getName() { return name; } private String name; public MyTask(Priority priority,String name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
MyFutureTask类 :由于我们使用PriorityBlocingQueue来保存我们的任务,我们的任务必须包含在FutureTask中,我们的FutureTask实现必须实现Comparable接口。 Comparable接口比较2个不同任务的优先级,并提交具有最高执行优先级的任务。
public class MyFutureTask extends FutureTask implements Comparable { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } }
优先级 :自解释优先级。
public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } }
现在,当我们运行此示例时,我们得到以下输出
The following Runnable is getting executed High The following Runnable is getting executed Low
即使我们先提交LOW优先级,但稍后提交HIGH优先级任务,但由于我们使用PriorityBlockingQueue,优先级较高的任务将首先执行。
看起来他们离开了阿帕奇的和谐。 大约一年前有一个svn提交日志 ,修复了缺少newTaskFor
。 您可以在扩展的ThreadPoolExecutor
覆盖submit
函数,以创建可Comparable
的扩展FutureTask
。 它们不是很长 。
回答你的问题: newTaskFor()
方法可以在ThreadPoolExecutor
的超类AbstractExecutorService
。 但是,您可以在ThreadPoolExecutor
简单地覆盖它。