1

我正在使用 flink 1.9 和 REST API/jobs/:jobid/savepoints来触发保存点并取消作业(优雅地停止作业以便稍后从保存点运行)。

我在源函数中使用了两阶段提交,所以我的源实现了CheckpointedFunctionCheckpointListener接口。在snapshotState()方法调用中,我将内部状态和notifyCheckpointComplete()检查点状态快照到第 3 方系统。

从源代码中我可以看到,只有snapshotState()部分是同步的CheckpointCoordinator-

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

检查点确认和完成通知在AsyncCheckpointRunnable.

话虽如此,当savepointwith cancel-jobset totrue被触发时,在拍摄快照后,一些任务管理器会在作业取消和执行之前继续接收完成通知notifyCheckpointComplete(),而有些则不会。

问题是是否有办法使用保存点取消作业,以便notifyCheckpointComplete()保证在作业取消之前由所有任务管理器调用,或者目前没有办法实现这一点?

4

2 回答 2

2

自从我查看 Flink 1.9 以来已经有一段时间了,所以请谨慎回答我的问题。

我的猜测是您的消息来源取消得太早了。所以notifyCheckpointComplete实际上是发送到所有任务,但是有些SourceFunction已经退出run并且各自的任务被清理了。

Afaik,如果您在收到最后一个notifyCheckpointComplete.

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}
于 2020-08-07T13:30:06.910 回答
1

使用 stop-with-savepoint[1][2] 不能解决问题吗?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop [2] https://ci.apache.org/projects/flink /flink-docs-stable/ops/cli.html

于 2020-08-10T12:22:18.230 回答