我已经研究过解决方案,所以一个是您可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
并通过设置 setStateBackendFactory (我已经使用自定义类)
static class bakend implements FlinkStateBackendFactory{
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options) {
return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
}
}
这将创建一个 checkpointDir 您还需要设置一个 checkpointinginterval 值才能启用检查点。