0

我在 Amazon KDA 上部署了一个 Apache Beam 应用程序。

它使用默认设置启用了检查点。

"FlinkApplicationConfigurationDescription": {
"CheckpointConfigurationDescription": {
"ConfigurationType": "DEFAULT",
"CheckpointingEnabled": true,
"CheckpointInterval": 60000,
"MinPauseBetweenCheckpoints": 5000
},

但在应用程序日志中,我可以看到:

“存在依赖检查点的 UnboundedSources,但检查点已禁用。”

CheckpointInterval如果我作为运行时属性传递给我的应用程序,它只会检查点。那么有必要明确地传递这些值吗?

该应用程序基本上是从 Kinesis 中读取数据,将数据窗口化为大小约为 30 秒的固定持续时间,然后将数据发布回 PubSub。

   pipeline
            .apply("Read from Kinesis",  new KinesisIORead())
            .apply("Windowing", Window.into(FixedWindows.of(Duration.standardSeconds(30))))
            .apply(WithKeys.of(DUMMY_KEY))
            .apply(GroupIntoBatches.ofSize(5))
            .apply(Values.create())
            .apply("Map values to single object", ParDo.of(new GroupedMessage()))
            .apply("Write to Pub/Sub", new PubSubWrite()));

应用程序 jar 包括:

  • 梁-sdks-java-core:2.31.0
  • 光束运行器-flink-1.11:2.31.0
  • 梁-sdks-java-io-kafka:2.31.0
4

1 回答 1

0

似乎它org.apache.flink.streaming.api.environment.StreamExecutionEnvironment没有获取 AWS UI 中设置的更改,因此在调用getCheckpointConfig().isCheckpointingEnabled()它时声称未启用检查点。我会明确地传递这些以确保它正常工作。

于 2021-10-04T19:42:10.423 回答