4

我想在 Java 中实现一个简单的类似 DAG 的调度程序(不需要结果),如下图所示:

类 DAG 调度

我可以简单地使用手动代码来实现这一点:

ExecutorService executor = Executors.newCachedThreadPool();
Future<?> futureA = executor.submit(new Task("A"));
Future<?> futureC = executor.submit(new Task("C"));
futureA.get();
Future<?> futureB = executor.submit(new Task("B"));
futureB.get();
futureC.get();
Future<?> futureD = executor.submit(new Task("D"));
futureD.get();

但我正在寻找一种更通用的方法来做到这一点,所以我可以像这样使用调度程序:

Container container = new Container();
container.addTask("A", new Task("A"));
container.addTask("B", new Task("B"), "A");
container.addTask("C", new Task("C"));
container.addTask("D", new Task("D"), "B", "C");
container.waitForCompletion();

实际上我已经实现了一个简单的:

https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java

但我需要每 100 毫秒迭代一次所有任务,以查看准备提交的任务。同样在这个实现中没有异常检查。

我还检查了 Guava lib 的 ListenableFuture,但我不知道如何正确使用它。

任何有关如何实现 DAG 或推荐现有开源调度程序的建议都将不胜感激。

4

4 回答 4

6

您正在寻找可以使用谷歌的番石榴库和它的可听未来界面来完成。ListenableFutures 允许您拥有复杂的异步操作链。使用 allAsList 方法完成任务 B 和 C 后,您应该实现一个可听的未来来执行任务 D。

Listenable Futures 的文档: https ://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

关于可听期货的教程: http ://www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html

使用 allAsList、chain 和 transform 方法的示例: http ://codingjunkie.net/google-guava-futures/

于 2013-10-02T22:27:19.113 回答
6

Dexecutor(免责声明:我是所有者)是您要查找的库,这是一个示例

public class WorkFlowManager {

    private final Dexecutor<String, Boolean> dexecutor;

    public WorkFlowManager(ExecutorService executorService) {
        this.dexecutor = buildDexecutor(executorService);

        buildGraph();
    }

    private Dexecutor<String, Boolean> buildDexecutor(final ExecutorService executorService) {
        DexecutorConfig<String, Boolean> config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider());
        return new DefaultDexecutor<>(config);
    }

    private void buildGraph() {
        this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME);
        this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME);
        this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME);
        this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME);
    }

    public void execute() {
        this.dexecutor.execute(ExecutionConfig.TERMINATING);
    }
}

这将构建以下图表,并且执行将相应地进行。 执行器图

请参阅我该怎么做?了解更多详情

为什么选择执行者

  • 超轻量
  • 超快
  • 支持立即/计划重试逻辑
  • 支持非终止行为
  • 有条件地跳过任务执行
  • 良好的测试覆盖率,让您免受伤害
  • 在 Maven 中央可用
  • 大量的文档
  • 支持分布式执行(Ignite、Hazelcast、Infinispan)

有用的链接

于 2016-11-08T18:00:05.003 回答
0

另一个可能的方向是JavaRed 库,它提供了一个接口,用于使用类似同步的习惯用法来编写和定义异步图流。

举个简短​​的例子,一个像这个一样复杂的图流: 图执行流程

可以简单地实现:

Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA());
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a));
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a));
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a));
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c));
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e));
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f));
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));

有关它的更多信息,请参阅项目 wiki

于 2017-04-24T13:07:21.840 回答
0

我们可以使用 Java8 CompletableFuture 来解决你的问题,核心是这样的:

for (Task dependedentTask : dependedentTasks) {
    // get dependedentFuture by dependedentTask,for example
    CompletableFuture dependedentFuture = futureMap.get(dependedentTask);
    dependedentFutures.add(dependedentFuture);
}
CompletableFuture.allOf(dependedentFutures).thenRunAsync(current task execute it's method)...

希望有人编码。

于 2021-02-06T10:34:17.967 回答