如何在Java中实现类似DAG的调度程序?
我想在Java中实现一个简单的类似DAG的调度程序(不需要结果),如下图所示:
我可以简单地使用手动代码来实现这个目的:
ExecutorService executor = Executors.newCachedThreadPool(); Future futureA = executor.submit(new Task("A")); Future futureC = executor.submit(new Task("C")); futureA.get(); Future futureB = executor.submit(new Task("B")); futureB.get(); futureC.get(); Future futureD = executor.submit(new Task("D")); futureD.get();
但我正在寻找一种更通用的方法,所以我可以像这样使用调度程序:
Container container = new Container(); container.addTask("A", new Task("A")); container.addTask("B", new Task("B"), "A"); container.addTask("C", new Task("C")); container.addTask("D", new Task("D"), "B", "C"); container.waitForCompletion();
实际上我已经实现了一个简单的方法:
https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java
但我需要每隔100ms迭代一次所有任务,看看哪一个已准备好提交。 同样在这个实现中,没有exception检查。
我也检查了Guava lib的ListenableFuture,但我不知道如何正确使用它。
有关如何实现DAG或推荐现有开源调度程序的任何建议将不胜感激。
您正在寻找的是使用谷歌的guava库,它是可听的未来界面。 ListenableFutures允许您拥有复杂的异步操作链。 一旦使用allAsList方法完成任务B和C,就应该实现可监听的未来来执行任务D.
可听期货的文档: https : //code.google.com/p/guava-libraries/wiki/ListenableFutureExplained
关于可听期货的教程: http : //www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html
使用allAsList,chain和transform方法的示例: http ://codingjunkie.net/google-guava-futures/
Dexecutor(免责声明:我是所有者)是您正在寻找的图书馆,这是一个例子
public class WorkFlowManager { private final Dexecutor dexecutor; public WorkFlowManager(ExecutorService executorService) { this.dexecutor = buildDexecutor(executorService); buildGraph(); } private Dexecutor buildDexecutor(final ExecutorService executorService) { DexecutorConfig config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider()); return new DefaultDexecutor<>(config); } private void buildGraph() { this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME); this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME); this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME); this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME); } public void execute() { this.dexecutor.execute(ExecutionConfig.TERMINATING); } }
这将构建以下图表,并相应地执行。
请参考我如何? 了解更多细节
为何选择Dexecutor
- 超轻量
- 超快
- 支持立即/预定重试逻辑
- 支持非终止行为
- 有条件地跳过任务执行
- 良好的测试覆盖率,确保您免受伤害
- 可在maven中心使用
- 良好的文档量
- 支持分发执行(Ignite,Hazelcast,Infinispan)
有用的链接
- Dexecutor博客
- Dexecutor网站
- Dexecutor Wiki
- Dexecutor源代码
另一个可能的方向是JavaRed Library ,它提供了一个用于编写和定义异步图流的接口,具有类似同步的习惯用法。
作为一个简短的例子,图形流程像这样复杂:
可以简单地实现:
Result aResult = produceFutureOf(String.class).byExecuting(() -> executeA()); Result bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a)); Result cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a)); Result eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a)); Result dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c)); Result fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e)); Result gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f)); return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));
更多关于它的项目维基