1

我启用了操作员检查点并为ProcessFunction操作员顺利工作。

在作业失败时,我可以看到操作员状态如何在snapshotState()挂钩上外部化,在恢复时,我可以看到状态如何在initializeState()挂钩上恢复。

但是,当我尝试CheckpointedFunction在 an 上实现接口和上述两种方法时AsyncFunction,它似乎不起作用。我所做的几乎与ProcessFunction...相同,但是当工作在失败后关闭时,它似乎并没有被snapshotState()钩住,并且在工作恢复时,context.isRestored()总是错误的。

为什么CheckpointedFunction.snapshotState()CheckpointedFunction.initializeState()没有被执行AsyncFunction但是是的ProcessFunction

编辑:由于某种原因,我的检查点需要很长时间。我相信我的配置非常标准,1 秒的间隔,500 毫秒的最小暂停,恰好一次。没有其他调音。
我从检查点协调员那里得到这些痕迹

o.a.f.s.r.t.SubtaskCheckpointCoordinatorImpl - Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: 93905ms
2021-11-23 16:25:01 INFO  o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 4 for job 239d7967eac7900b33d7eadd483c9447 (671604 bytes in 112071 ms).

如果我尝试设置 checkpointTimeout,我需要按顺序或 5 分钟左右设置一些内容。这么小的状态(它只是一个 Counter 和一个 Long)的检查点怎么需要 5 分钟?

我还读到 NFS 卷是一个麻烦的秘诀,但到目前为止我还没有在集群上运行它,我只是在我的本地文件系统上测试它

4

1 回答 1

3

AsyncFunction根本不支持状态。原因是状态原语不同步,因此会在AsyncFunction. 这与没有 的原因相同KeyedAsyncFunction

如果 Flink 实现了https://cwiki.apache.org/confluence/display/FLINK/FLIP-22%3A+Eager+State+Declaration,那么它可以简单地在每个异步调用上附加状态并在成功异步时更新。

您可以在限制周围使用链式地图和插槽共享组做一些技巧,但它相当hacky。

于 2021-11-15T21:03:31.577 回答