使用ExecutorService控制任务执行顺序

我有一个将异步任务委托给线程池的进程。 我需要确保按顺序执行某些任务。 所以举个例子

任务按顺序到达

任务a1,b1,c1,d1,e1,a2,a3,b2,f1

任务可以按任何顺序执行,除非存在自然依赖性,因此必须按顺序处理a1,a2,a3,方法是分配到同一个线程或阻止这些,直到我知道之前的#任务完成为止。

目前它不使用Java Concurrency包,但我正在考虑改变以利用线程管理。

有没有人有类似的解决方案或如何实现这一目标的建议

当我在过去完成此操作时,我通常会将一个组件处理,然后将callables / runnables提交给Executor。

就像是。

  • 获得要运行的任务列表,其中一些具有依赖项
  • 创建一个Executor并使用ExecutorCompletionService进行换行
  • 搜索所有任何没有依赖关系的任务,通过完成服务安排它们
  • 轮询完成服务
  • 每项任务完成
    • 将其添加到“已完成”列表中
    • 重新评估任何等待任务到“完成列表”,看看它们是否“依赖完成”。 如果是这样安排他们
    • 冲洗重复,直到提交/完成所有任务

完成服务是一个很好的方式,能够完成任务,而不是试图轮询一堆期货。 但是,您可能希望保留一个Map ,当任务通过完成服务进行调度时Map被填充,这样当完成服务为您提供完整的Future时,您TaskIdentifier它是哪个TaskIdentifier

如果您发现自己处于任务仍在等待运行的状态,但没有任何运行且无法安排任何事情,那么您就会遇到循环依赖问题。

我编写了自己的Executor,它保证了具有相同密钥的任务的任务排序。 它使用具有相同键的订单任务的队列映射。 每个键控任务使用相同的键执行下一个任务。

此解决方案不处理RejectedExecutionException或委派的Executor中的其他exception! 因此委派的执行者应该是“无限的”。

 import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor; /** * This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). */ public class OrderingExecutor implements Executor{ private final Executor delegate; private final Map> keyedTasks = new HashMap>(); public OrderingExecutor(Executor delegate){ this.delegate = delegate; } @Override public void execute(Runnable task) { // task without key can be executed immediately delegate.execute(task); } public void execute(Runnable task, Object key) { if (key == null){ // if key is null, execute without ordering execute(task); return; } boolean first; Runnable wrappedTask; synchronized (keyedTasks){ Queue dependencyQueue = keyedTasks.get(key); first = (dependencyQueue == null); if (dependencyQueue == null){ dependencyQueue = new LinkedList(); keyedTasks.put(key, dependencyQueue); } wrappedTask = wrap(task, dependencyQueue, key); if (!first) dependencyQueue.add(wrappedTask); } // execute method can block, call it outside synchronize block if (first) delegate.execute(wrappedTask); } private Runnable wrap(Runnable task, Queue dependencyQueue, Object key) { return new OrderedTask(task, dependencyQueue, key); } class OrderedTask implements Runnable{ private final Queue dependencyQueue; private final Runnable task; private final Object key; public OrderedTask(Runnable task, Queue dependencyQueue, Object key) { this.task = task; this.dependencyQueue = dependencyQueue; this.key = key; } @Override public void run() { try{ task.run(); } finally { Runnable nextTask = null; synchronized (keyedTasks){ if (dependencyQueue.isEmpty()){ keyedTasks.remove(key); }else{ nextTask = dependencyQueue.poll(); } } if (nextTask!=null) delegate.execute(nextTask); } } } } 

当您向Runnable提交RunnableCallable ,您会收到Future 。 让依赖于a1的线程传递给a1的Future并调用Future.get() 。 这将阻塞直到线程完成。

所以:

 ExecutorService exec = Executor.newFixedThreadPool(5); Runnable a1 = ... final Future f1 = exec.submit(a1); Runnable a2 = new Runnable() { @Override public void run() { f1.get(); ... // do stuff } } exec.submit(a2); 

等等。

另一种选择是创建自己的执行程序,将其命名为OrderedExecutor,并创建一个封装的ThreadPoolExecutor对象数组,每个内部执行程序有1个线程。 然后,您提供了一种选择其中一个内部对象的机制,例如,您可以通过提供类的用户可以实现的接口来实现:

 executor = new OrderedExecutor(10 / * pool size * /,new OrderedExecutor.Chooser(){
   public int choose(Runnable runnable){
      MyRunnable myRunnable =(MyRunnable)runnable;
      return myRunnable.someId();
   });

 executor.execute(new MyRunnable());

然后,OrderedExecutor.execute()的实现将使用Chooser获取一个int,你用池大小来修改它,这是你对内部数组的索引。 想法是“someId()”将为所有“a”返回相同的值,等等。

您可以使用Executors.newSingleThreadExecutor(),但它只使用一个线程来执行您的任务。 另一种选择是使用CountDownLatch。 这是一个简单的例子:

 public class Main2 { public static void main(String[] args) throws InterruptedException { final CountDownLatch cdl1 = new CountDownLatch(1); final CountDownLatch cdl2 = new CountDownLatch(1); final CountDownLatch cdl3 = new CountDownLatch(1); List list = new ArrayList(); list.add(new Runnable() { public void run() { System.out.println("Task 1"); // inform that task 1 is finished cdl1.countDown(); } }); list.add(new Runnable() { public void run() { // wait until task 1 is finished try { cdl1.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 2"); // inform that task 2 is finished cdl2.countDown(); } }); list.add(new Runnable() { public void run() { // wait until task 2 is finished try { cdl2.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task 3"); // inform that task 3 is finished cdl3.countDown(); } }); ExecutorService es = Executors.newFixedThreadPool(200); for (int i = 0; i < 3; i++) { es.submit(list.get(i)); } es.shutdown(); es.awaitTermination(1, TimeUnit.MINUTES); } } 

在Habanero-Java库中 ,有一个数据驱动任务的概念,可用于表示任务之间的依赖关系并避免线程阻塞操作。 在幕后,Habanero-Java库使用JDKs ForkJoinPool(即ExecutorService)。

例如,任务A1,A2,A3,……的用例可表示如下:

 HjFuture a1 = future(() -> { doA1(); return true; }); HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; }); HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; }); 

请注意,a1,a2和a3只是对HjFuture类型的对象的引用,并且可以在自定义数据结构中进行维护,以指定任务A2和A3在运行时进入时的依赖关系。

有一些教程幻灯片可用 。 您可以在javadoc , API摘要和引物中找到更多文档。

我为这个问题创建了一个OrderingExecutor。 如果将相同的密钥传递给具有不同runnable的方法execute(),则具有相同密钥的runnable的执行将按调用execute()的顺序执行,并且永远不会重叠。

 import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; /** * Special executor which can order the tasks if a common key is given. * Runnables submitted with non-null key will guaranteed to run in order for the same key. * */ public class OrderedExecutor { private static final Queue EMPTY_QUEUE = new QueueWithHashCodeAndEquals( new ConcurrentLinkedQueue()); private ConcurrentMap> taskMap = new ConcurrentHashMap>(); private Executor delegate; private volatile boolean stopped; public OrderedExecutor(Executor delegate) { this.delegate = delegate; } public void execute(Runnable runnable, Object key) { if (stopped) { return; } if (key == null) { delegate.execute(runnable); return; } Queue queueForKey = taskMap.computeIfPresent(key, (k, v) -> { v.add(runnable); return v; }); if (queueForKey == null) { // There was no running task with this key Queue newQ = new QueueWithHashCodeAndEquals(new ConcurrentLinkedQueue()); newQ.add(runnable); // Use putIfAbsent because this execute() method can be called concurrently as well queueForKey = taskMap.putIfAbsent(key, newQ); if (queueForKey != null) queueForKey.add(runnable); delegate.execute(new InternalRunnable(key)); } } public void shutdown() { stopped = true; taskMap.clear(); } /** * Own Runnable used by OrderedExecutor. * The runnable is associated with a specific key - the Queue<Runnable> for this * key is polled. * If the queue is empty, it tries to remove the queue from taskMap. * */ private class InternalRunnable implements Runnable { private Object key; public InternalRunnable(Object key) { this.key = key; } @Override public void run() { while (true) { // There must be at least one task now Runnable r = taskMap.get(key).poll(); while (r != null) { r.run(); r = taskMap.get(key).poll(); } // The queue emptied // Remove from the map if and only if the queue is really empty boolean removed = taskMap.remove(key, EMPTY_QUEUE); if (removed) { // The queue has been removed from the map, // if a new task arrives with the same key, a new InternalRunnable // will be created break; } // If the queue has not been removed from the map it means that someone put a task into it // so we can safely continue the loop } } } /** * Special Queue implementation, with equals() and hashCode() methods. * By default, Java SE queues use identity equals() and default hashCode() methods. * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). * * @param  The type of elements in the queue. */ private static class QueueWithHashCodeAndEquals implements Queue { private Queue delegate; public QueueWithHashCodeAndEquals(Queue delegate) { this.delegate = delegate; } public boolean add(E e) { return delegate.add(e); } public boolean offer(E e) { return delegate.offer(e); } public int size() { return delegate.size(); } public boolean isEmpty() { return delegate.isEmpty(); } public boolean contains(Object o) { return delegate.contains(o); } public E remove() { return delegate.remove(); } public E poll() { return delegate.poll(); } public E element() { return delegate.element(); } public Iterator iterator() { return delegate.iterator(); } public E peek() { return delegate.peek(); } public Object[] toArray() { return delegate.toArray(); } public  T[] toArray(T[] a) { return delegate.toArray(a); } public boolean remove(Object o) { return delegate.remove(o); } public boolean containsAll(Collection c) { return delegate.containsAll(c); } public boolean addAll(Collection c) { return delegate.addAll(c); } public boolean removeAll(Collection c) { return delegate.removeAll(c); } public boolean retainAll(Collection c) { return delegate.retainAll(c); } public void clear() { delegate.clear(); } @Override public boolean equals(Object obj) { if (!(obj instanceof QueueWithHashCodeAndEquals)) { return false; } QueueWithHashCodeAndEquals other = (QueueWithHashCodeAndEquals) obj; return Arrays.equals(toArray(), other.toArray()); } @Override public int hashCode() { return Arrays.hashCode(toArray()); } } } 

我写了我的赢得执行者服务,它是序列感知的。 它对包含某些相关参考和当前飞行的任务进行排序。

您可以通过https://github.com/nenapu/SequenceAwareExecutorService进行实施