我启用了操作员检查点并为ProcessFunction
操作员顺利工作。
在作业失败时,我可以看到操作员状态如何在snapshotState()
挂钩上外部化,在恢复时,我可以看到状态如何在initializeState()
挂钩上恢复。
但是,当我尝试CheckpointedFunction
在 an 上实现接口和上述两种方法时AsyncFunction
,它似乎不起作用。我所做的几乎与ProcessFunction
...相同,但是当工作在失败后关闭时,它似乎并没有被snapshotState()
钩住,并且在工作恢复时,context.isRestored()
总是错误的。
为什么CheckpointedFunction.snapshotState()
和CheckpointedFunction.initializeState()
没有被执行AsyncFunction
但是是的ProcessFunction
?
编辑:由于某种原因,我的检查点需要很长时间。我相信我的配置非常标准,1 秒的间隔,500 毫秒的最小暂停,恰好一次。没有其他调音。
我从检查点协调员那里得到这些痕迹
o.a.f.s.r.t.SubtaskCheckpointCoordinatorImpl - Time from receiving all checkpoint barriers/RPC to executing it exceeded threshold: 93905ms
2021-11-23 16:25:01 INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 4 for job 239d7967eac7900b33d7eadd483c9447 (671604 bytes in 112071 ms).
如果我尝试设置 checkpointTimeout,我需要按顺序或 5 分钟左右设置一些内容。这么小的状态(它只是一个 Counter 和一个 Long)的检查点怎么需要 5 分钟?
我还读到 NFS 卷是一个麻烦的秘诀,但到目前为止我还没有在集群上运行它,我只是在我的本地文件系统上测试它