我应该使用Java中的哪个ThreadPool?

有大量的任务。 每个任务都属于一个组。 要求是每组任务应该像在单个线程中执行一样串行执行,并且吞吐量应该在多核(或多CPU)环境中最大化。 注意:还有大量的组与任务数量成比例。

天真的解决方案是使用ThreadPoolExecutor并同步(或锁定)。 但是,线程会相互阻塞,吞吐量不会最大化。

有什么好主意吗? 或者是否存在满足要求的第三方库?

一种简单的方法是将所有组任务“连接”成一个超级任务,从而使子任务串行运行。 但这可能会导致其他组无法启动的延迟,除非其他组完全完成并在线程池中腾出一些空间。

作为替代方案,请考虑链接组的任务。 以下代码说明了这一点:

public class MultiSerialExecutor { private final ExecutorService executor; public MultiSerialExecutor(int maxNumThreads) { executor = Executors.newFixedThreadPool(maxNumThreads); } public void addTaskSequence(List tasks) { executor.execute(new TaskChain(tasks)); } private void shutdown() { executor.shutdown(); } private class TaskChain implements Runnable { private List seq; private int ind; public TaskChain(List seq) { this.seq = seq; } @Override public void run() { seq.get(ind++).run(); //NOTE: No special error handling if (ind < seq.size()) executor.execute(this); } } 

优点是没有使用额外的资源(线程/队列),并且任务的粒度优于天真方法中的任务。 缺点是所有小组的任务都应事先知道

- 编辑 -

要使此解决方案通用且完整,您可能需要决定error handling(即,即使发生错误,链是否继续),并且实现ExecutorService并将所有调用委托给基础执行程序也是一个好主意。

我建议使用任务队列:

  • 对于每组任务,您已创建队列并将该组中的所有任务插入其中。
  • 现在所有队列都可以并行执行,而一个队列中的任务是串行执行的。

快速谷歌搜索表明java api本身没有任务/线程队列。 但是,有很多关于编码的教程。 如果你知道一些,每个人都可以自由列出好的教程/实现:

我大多同意Dave的答案,但如果你需要在所有“组”中切换CPU时间,即所有任务组都应该并行进行,你可能会发现这种结构很有用(使用remove作为“lock”。这在我的情况虽然我想它往往会使用更多的内存):

 class TaskAllocator { private final ConcurrentLinkedQueue> entireWork = childQueuePerTaskGroup(); public Queue lockTaskGroup(){ return entireWork.poll(); } public void release(Queue taskGroup){ entireWork.offer(taskGroup); } } 

  class DoWork implmements Runnable { private final TaskAllocator allocator; public DoWork(TaskAllocator allocator){ this.allocator = allocator; } pubic void run(){ for(;;){ Queue taskGroup = allocator.lockTaskGroup(); if(task==null){ //No more work return; } Runnable work = taskGroup.poll(); if(work == null){ //This group is done continue; } //Do work, but never forget to release the group to // the allocator. try { work.run(); } finally { allocator.release(taskGroup); } }//for } } 

然后,您可以使用最佳线程数来运行DoWork任务。 这是一种循环负载平衡..

您甚至可以通过在TaskAllocator使用此而不是简单的队列来执行更复杂的TaskAllocator (剩余任务更多的任务组往往会被执行)

 ConcurrentSkipListSet> sophisticatedQueue = new ConcurrentSkipListSet(new SophisticatedComparator()); 

SophisticatedComparator在哪里

 class SophisticatedComparator implements Comparator> { public int compare(MyQueue o1, MyQueue o2){ int diff = o2.size() - o1.size(); if(diff==0){ //This is crucial. You must assign unique ids to your //Subqueue and break the equality if they happen to have same size. //Otherwise your queues will disappear... return o1.id - o2.id; } return diff; } } 

Actor也是此指定类型问题的另一种解决方案。 Scala有演员和Java,由AKKA提供。

我有一个类似于你的问题,我使用ExecutorCompletionServiceExecutor一起完成任务集合。 以下是java.util.concurrent API的摘录,自Java7开始:

假设你有一组针对某个问题的求解器,每个都返回某个类型的Result值,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(结果) R)。 你可以这样写:

 void solve(Executor e, Collection> solvers) throws InterruptedException, ExecutionException { CompletionService ecs = new ExecutorCompletionService(e); for (Callable s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } } 

因此,在您的方案中,每个任务都将是一个Callable ,任务将分组在Collection>

参考: http : //docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html