如何实现ExecutorService以轮换为基础执行任务?

我正在使用带有固定线程池的 java.util.concurrent.ExecutorService来执行任务列表。 我的任务列表通常大约是80到150,我将任何时候运行的线程数限制为10,如下所示:

ExecutorService threadPoolService = Executors.newFixedThreadPool(10); for ( Runnable task : myTasks ) { threadPoolService.submit(task); } 

我的用例要求甚至已完成的任务应该再次重新提交给ExecutorService,但只有在所有提交的任务都得到服务/完成时才应该执行/再次执行。 基本上,提交的任务应该以轮换方式执行。 因此,在这种情况下,不会有threadPoolService.shutdown()threadPoolService.shutdownNow()调用。

我的问题是,如何实现ExecutorService服务轮换基础任务?

ThreadPoolExecutor为afterExecution提供了一个扩展点,您可以将作业放回队列的末尾。

 public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor { public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); this.submit(r); } } 

当然,如果没有ExecutorService方便的工厂方法的帮助,你必须做更多的工作来自己实例化它,但构造函数很简单,可以理解。

答案与用于ExecutorService实例的工作队列的实现更相关。 所以,我建议:

  1. 首先选择提供循环队列function的java.util.concurrent.BlockingQueue ( 示例 )的实现。 注意 ,选择BlockingQueue的原因是要等到下一个任务提供给队列; 因此,在循环+阻塞队列的情况下,您应该小心如何提供相同的行为和function。

  2. 而不是使用Executors.new...来创建一个新的ThreadPoolExecutor使用直接构造函数 ,如

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)

这样,除非您命令执行程序shutdown ,否则它将尝试从队列中获取下一个任务,以便从其工作队列中执行,该队列是任务的循环容器。

我建议以下解决方案完全使用标准库并发工具中存在的function。 它使用带有任务装饰器类的CyclicBarrier和重新提交所有任务的屏障操作:

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Rotation { private static final class RotationDecorator implements Runnable { private final Runnable task; private final CyclicBarrier barrier; RotationDecorator( Runnable task, CyclicBarrier barrier ) { this.task = task; this.barrier = barrier; } @Override public void run() { this.task.run(); try { this.barrier.await(); } catch(InterruptedException e) { ; // Consider better exception handling } catch(BrokenBarrierException e) { ; // Consider better exception handling } } } public void startRotation( List tasks ) { final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 ); final List rotatingTasks = new ArrayList( tasks.size() ); final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() { @Override public void run() { Rotation.this.enqueueTasks( threadPoolService, rotatingTasks ); } } ); for(Runnable task : tasks) { rotatingTasks.add( new RotationDecorator( task, barrier ) ); } this.enqueueTasks( threadPoolService, rotatingTasks ); } private void enqueueTasks( ExecutorService service, List tasks ) { for(Runnable task : tasks) { service.submit( task ); } } } 

您可以简单地检查所有任务是否已执行,并在情况发生后重新提交,例如:

  List futures = new ArrayList<>(); for (Runnable task : myTasks) { futures.add(threadPoolService.submit(task)); } //wait until completion of all tasks for (Future f : futures) { f.get(); } //restart ...... 

编辑
您似乎想在任务完成后立即重新提交。 您可以使用ExecutorCompletionService ,它允许您在执行任务时检索任务, – 请参阅下面的一个简单示例,其中包含2个任务,一旦完成就会重新提交几次。 样本输出:

任务1提交了pool-1-thread-1
任务2提交了pool-1-thread-2
任务1完成了pool-1-thread-1
任务1提交了pool-1-thread-3
任务2完成了pool-1-thread-2
任务1完成了pool-1-thread-3
任务2提交了pool-1-thread-4
任务1提交了pool-1-thread-5
任务1完成了pool-1-thread-5
任务2完成了pool-1-thread-4

 public class Test1 { public final ConcurrentMap concurrentMap = new ConcurrentHashMap<>(); public final AtomicInteger retries = new AtomicInteger(); public final Object lock = new Object(); public static void main(String[] args) throws InterruptedException, ExecutionException { int count = 0; List myTasks = new ArrayList<>(); myTasks.add(getRunnable(1)); myTasks.add(getRunnable(2)); ExecutorService threadPoolService = Executors.newFixedThreadPool(10); CompletionService ecs = new ExecutorCompletionService(threadPoolService); for (Runnable task : myTasks) { ecs.submit(task, task); } //wait until completion of all tasks while(count++ < 3) { Runnable task = ecs.take().get(); ecs.submit(task, task); } threadPoolService.shutdown(); } private static Runnable getRunnable(final int i) { return new Runnable() { @Override public void run() { System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + " "); try { Thread.sleep(500 * i); } catch (InterruptedException ex) { System.out.println("Interrupted"); } System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + " "); } }; } }