我正在使用 flink 1.9 和 REST API/jobs/:jobid/savepoints来触发保存点并取消作业(优雅地停止作业以便稍后从保存点运行)。
我在源函数中使用了两阶段提交,所以我的源实现了CheckpointedFunction和CheckpointListener接口。在snapshotState()方法调用中,我将内部状态和notifyCheckpointComplete()检查点状态快照到第 3 方系统。
从源代码中我可以看到,只有snapshotState()部分是同步的CheckpointCoordinator-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点确认和完成通知在AsyncCheckpointRunnable.
话虽如此,当savepointwith cancel-jobset totrue被触发时,在拍摄快照后,一些任务管理器会在作业取消和执行之前继续接收完成通知notifyCheckpointComplete(),而有些则不会。
问题是是否有办法使用保存点取消作业,以便notifyCheckpointComplete()保证在作业取消之前由所有任务管理器调用,或者目前没有办法实现这一点?