如何在Java中实现类似DAG的调度程序?

我想在Java中实现一个简单的类似DAG的调度程序(不需要结果),如下图所示:

类似DAG的调度

我可以简单地使用手动代码来实现这个目的:

ExecutorService executor = Executors.newCachedThreadPool(); Future futureA = executor.submit(new Task("A")); Future futureC = executor.submit(new Task("C")); futureA.get(); Future futureB = executor.submit(new Task("B")); futureB.get(); futureC.get(); Future futureD = executor.submit(new Task("D")); futureD.get(); 

但我正在寻找一种更通用的方法,所以我可以像这样使用调度程序:

 Container container = new Container(); container.addTask("A", new Task("A")); container.addTask("B", new Task("B"), "A"); container.addTask("C", new Task("C")); container.addTask("D", new Task("D"), "B", "C"); container.waitForCompletion(); 

实际上我已经实现了一个简单的方法:

https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java

但我需要每隔100ms迭代一次所有任务,看看哪一个已准备好提交。 同样在这个实现中,没有exception检查。

我也检查了Guava lib的ListenableFuture,但我不知道如何正确使用它。

有关如何实现DAG或推荐现有开源调度程序的任何建议将不胜感激。

您正在寻找的是使用谷歌的guava库,它是可听的未来界面。 ListenableFutures允许您拥有复杂的异步操作链。 一旦使用allAsList方法完成任务B和C,就应该实现可监听的未来来执行任务D.

可听期货的文档: https : //code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

关于可听期货的教程: http : //www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html

使用allAsList,chain和transform方法的示例: http ://codingjunkie.net/google-guava-futures/

Dexecutor(免责声明:我是所有者)是您正在寻找的图书馆,这是一个例子

 public class WorkFlowManager { private final Dexecutor dexecutor; public WorkFlowManager(ExecutorService executorService) { this.dexecutor = buildDexecutor(executorService); buildGraph(); } private Dexecutor buildDexecutor(final ExecutorService executorService) { DexecutorConfig config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider()); return new DefaultDexecutor<>(config); } private void buildGraph() { this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME); this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME); this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME); } public void execute() { this.dexecutor.execute(ExecutionConfig.TERMINATING); } } 

这将构建以下图表,并相应地执行。 Dexecutor Graph

请参考我如何? 了解更多细节

为何选择Dexecutor

  • 超轻量
  • 超快
  • 支持立即/预定重试逻辑
  • 支持非终止行为
  • 有条件地跳过任务执行
  • 良好的测试覆盖率,确保您免受伤害
  • 可在maven中心使用
  • 良好的文档量
  • 支持分发执行(Ignite,Hazelcast,Infinispan)

有用的链接

  • Dexecutor博客
  • Dexecutor网站
  • Dexecutor Wiki
  • Dexecutor源代码

另一个可能的方向是JavaRed Library ,它提供了一个用于编写和定义异步图流的接口,具有类似同步的习惯用法。

作为一个简短的例子,图形流程像这样复杂: 图执行流程

可以简单地实现:

 Result aResult = produceFutureOf(String.class).byExecuting(() -> executeA()); Result bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a)); Result cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a)); Result eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a)); Result dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c)); Result fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e)); Result gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f)); return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g)); 

更多关于它的项目维基