如何实现ExecutorService以轮换为基础执行任务?
我正在使用带有固定线程池的 java.util.concurrent.ExecutorService来执行任务列表。 我的任务列表通常大约是80到150,我将任何时候运行的线程数限制为10,如下所示:
ExecutorService threadPoolService = Executors.newFixedThreadPool(10); for ( Runnable task : myTasks ) { threadPoolService.submit(task); }
我的用例要求甚至已完成的任务应该再次重新提交给ExecutorService,但只有在所有已提交的任务都得到服务/完成时才应该执行/再次执行。 基本上,提交的任务应该以轮换方式执行。 因此,在这种情况下,不会有threadPoolService.shutdown()
或threadPoolService.shutdownNow()
调用。
我的问题是,如何实现ExecutorService服务轮换基础任务?
ThreadPoolExecutor为afterExecution提供了一个扩展点,您可以将作业放回队列的末尾。
public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor { public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); this.submit(r); } }
当然,如果没有ExecutorService
方便的工厂方法的帮助,你必须做更多的工作来自己实例化它,但构造函数很简单,可以理解。
答案与用于ExecutorService
实例的工作队列的实现更相关。 所以,我建议:
-
首先选择提供循环队列function的
java.util.concurrent.BlockingQueue
( 示例 )的实现。 注意 ,选择BlockingQueue
的原因是要等到下一个任务提供给队列; 因此,在循环+阻塞队列的情况下,您应该小心如何提供相同的行为和function。 -
而不是使用
Executors.new...
来创建一个新的ThreadPoolExecutor
使用直接构造函数 ,如
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
workQueue)
这样,除非您命令执行程序shutdown
,否则它将尝试从队列中获取下一个任务,以便从其工作队列中执行,该队列是任务的循环容器。
我建议以下解决方案完全使用标准库并发工具中存在的function。 它使用带有任务装饰器类的CyclicBarrier
和重新提交所有任务的屏障操作:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Rotation { private static final class RotationDecorator implements Runnable { private final Runnable task; private final CyclicBarrier barrier; RotationDecorator( Runnable task, CyclicBarrier barrier ) { this.task = task; this.barrier = barrier; } @Override public void run() { this.task.run(); try { this.barrier.await(); } catch(InterruptedException e) { ; // Consider better exception handling } catch(BrokenBarrierException e) { ; // Consider better exception handling } } } public void startRotation( List tasks ) { final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 ); final List rotatingTasks = new ArrayList ( tasks.size() ); final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() { @Override public void run() { Rotation.this.enqueueTasks( threadPoolService, rotatingTasks ); } } ); for(Runnable task : tasks) { rotatingTasks.add( new RotationDecorator( task, barrier ) ); } this.enqueueTasks( threadPoolService, rotatingTasks ); } private void enqueueTasks( ExecutorService service, List tasks ) { for(Runnable task : tasks) { service.submit( task ); } } }
您可以简单地检查所有任务是否已执行,并在情况发生后重新提交,例如:
List futures = new ArrayList<>(); for (Runnable task : myTasks) { futures.add(threadPoolService.submit(task)); } //wait until completion of all tasks for (Future f : futures) { f.get(); } //restart ......
编辑
您似乎想在任务完成后立即重新提交。 您可以使用ExecutorCompletionService ,它允许您在执行任务时检索任务, – 请参阅下面的一个简单示例,其中包含2个任务,一旦完成就会重新提交几次。 样本输出:
任务1提交了pool-1-thread-1
任务2提交了pool-1-thread-2
任务1完成了pool-1-thread-1
任务1提交了pool-1-thread-3
任务2完成了pool-1-thread-2
任务1完成了pool-1-thread-3
任务2提交了pool-1-thread-4
任务1提交了pool-1-thread-5
任务1完成了pool-1-thread-5
任务2完成了pool-1-thread-4
public class Test1 { public final ConcurrentMap concurrentMap = new ConcurrentHashMap<>(); public final AtomicInteger retries = new AtomicInteger(); public final Object lock = new Object(); public static void main(String[] args) throws InterruptedException, ExecutionException { int count = 0; List myTasks = new ArrayList<>(); myTasks.add(getRunnable(1)); myTasks.add(getRunnable(2)); ExecutorService threadPoolService = Executors.newFixedThreadPool(10); CompletionService ecs = new ExecutorCompletionService (threadPoolService); for (Runnable task : myTasks) { ecs.submit(task, task); } //wait until completion of all tasks while(count++ < 3) { Runnable task = ecs.take().get(); ecs.submit(task, task); } threadPoolService.shutdown(); } private static Runnable getRunnable(final int i) { return new Runnable() { @Override public void run() { System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + " "); try { Thread.sleep(500 * i); } catch (InterruptedException ex) { System.out.println("Interrupted"); } System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + " "); } }; } }