我正在使用 flink 1.9 和 REST API/jobs/:jobid/savepoints
来触发保存点并取消作业(优雅地停止作业以便稍后从保存点运行)。
我在源函数中使用了两阶段提交,所以我的源实现了CheckpointedFunction
和CheckpointListener
接口。在snapshotState()
方法调用中,我将内部状态和notifyCheckpointComplete()
检查点状态快照到第 3 方系统。
从源代码中我可以看到,只有snapshotState()
部分是同步的CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点确认和完成通知在AsyncCheckpointRunnable
.
话虽如此,当savepoint
with cancel-job
set totrue
被触发时,在拍摄快照后,一些任务管理器会在作业取消和执行之前继续接收完成通知notifyCheckpointComplete()
,而有些则不会。
问题是是否有办法使用保存点取消作业,以便notifyCheckpointComplete()
保证在作业取消之前由所有任务管理器调用,或者目前没有办法实现这一点?