3

我正在阅读未绑定的源(Kafka)并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache 梁文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。

以下是我用于管道的参数:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

谁能帮我检查点?

4

2 回答 2

1

我已经研究过解决方案,所以一个是您可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-

        @Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

并通过设置 setStateBackendFactory (我已经使用自定义类)

  static class  bakend implements FlinkStateBackendFactory{

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) {
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        }
    }

这将创建一个 checkpointDir 您还需要设置一个 checkpointinginterval 值才能启用检查点。

于 2020-07-27T05:12:55.283 回答
0

我知道它很旧,但想同意你的回答。我们在 2019 年构建了一个 dockerized flink,并使用这些选项进行光束和运行

--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev

我们已经在 conf.yml 中配置了 RocksDB 作为后端。

于 2021-02-10T18:59:29.720 回答