0

我正在使用 Yarn 来运行 Flink 作业。对于每个 Flink 作业,我都在创建一个检查点。

我提交了一个在我的 Yarn 集群中运行的 Flink 作业。我有一个轮询作业,它检查 Yarn 上的作业是否失败并重新启动它。当再次提交作业时,Yarn 会为此 Flink 作业创建一个新的 application_id。如何配置重新提交的 Flink 作业以使用重新启动的 Flink 作业的检查点。

我已经 state.savepoints.dir = hdfs://localhost:9000/checkpoint/在 flink-conf.yaml 中设置了 conf`

在创建 Flink 作业时, streamExecutionEnvironment.setStateBackend(new FsStateBackend("hdfs://localhost:9000/checkpoint/uuid-job-1"));

当我进行此设置时,检查点保存在 conf 文件 ( hdfs://localhost:9000/checkpoint/) 中指定的路径中,而不是我在创建 Flink 作业时设置的路径中。

任何帮助将不胜感激。谢谢!

4

1 回答 1

2

不幸的是,你不能从旧的检查点开始新的工作。您可以做的是使用外部检查点。flink <=1.5 的一个缺点是外部检查点的元数据存储在由 config 参数设置的所有作业的单个目录中:state.checkpoints.dir. 但是您可以在每次提交之前对其进行更改。

来自邮件列表线程的附加说明:

好消息是 Flink 1.5 将重新设计外部检查点的工作方式:基本上,所有检查点现在都可以被认为是外部化的,元数据将存储在检查点的根目录中,而不是所有作业的一个全局目录中。这样,外部化检查点的元数据位于每个作业的检查点目录中,并且从中恢复应该相当简单。

于 2018-05-23T08:39:08.687 回答