1

我在 kubernetes 集群中使用带有作业管理器 pod 和两个任务管理器 pod 的 Flink 集群。当我将流式作业提交给作业管理器时,它会运行作业并将输出接收到接收器中。我还启用了检查点以从故障中恢复。现在,当我故意删除一个任务管理器 pod 以验证 flink 中的节点故障处理时,我看到一些假设到达接收器的记录没有收到。当 Kubernetes 自动重启 Pod 时,它会继续处理记录,但不会从检查点恢复。我正在使用以下命令提交作业

flink run -Dparallelism=2 -m localhost:<port> -c <flink job> -p=2 <flink job>.jar

我在工作环境中有以下内容:

 env.enableCheckpointing(10000)
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
   
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(Paths.get(<checkpoint path>).toUri, false))

当任务管理器 pod 重新启动时,我有以下日志。

2020-10-01 10:01:30,096 INFO  org.apache.flink.runtime.blob.BlobClient                     [] - Downloading 2966c462794bf94523e9a53c1d9a2f13/p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655 from flinksessioncluster-sample-jobmanager/172.20.225.40:6124

但是在检查点目录 2966c462794bf94523e9a53c1d9a2f13 我只有以下项目。

chk-299  shared  taskowned

我在目录 2966c462794bf94523e9a53c1d9a2f13 中没有目录 p-421bdbdb924a09ddc017b854d52d9a9457109d43-7307729d1f0408de20cd29e352a2a655

根据文档,任务应该自动从检查点位置恢复。

请让我知道这可能是哪里的问题。

更新

进行了实际测试 -

以“t”秒间隔将记录连续插入到 flink 作业中。当任务管理器处理记录时,我杀死了一个任务管理器 pod。此时我停止将记录插入到 flink 作业中。在作业的输入端,我向其中插入了 1000 条记录。当任务管理器再次出现时,我在接收器中有 700 条记录。

现在我开始一次插入一条记录,发现 sink 中的记录突然增加到 940 条,然后开始增加 1,即任务管理器崩溃后插入的记录开始进入 sink。但是我从最初的 1000 条记录中丢失了 60 条记录,这些记录是在任务管理器崩溃之前插入的

4

0 回答 0