我在 Amazon KDA 上部署了一个 Apache Beam 应用程序。
它使用默认设置启用了检查点。
"FlinkApplicationConfigurationDescription": {
"CheckpointConfigurationDescription": {
"ConfigurationType": "DEFAULT",
"CheckpointingEnabled": true,
"CheckpointInterval": 60000,
"MinPauseBetweenCheckpoints": 5000
},
但在应用程序日志中,我可以看到:
“存在依赖检查点的 UnboundedSources,但检查点已禁用。”
CheckpointInterval
如果我作为运行时属性传递给我的应用程序,它只会检查点。那么有必要明确地传递这些值吗?
该应用程序基本上是从 Kinesis 中读取数据,将数据窗口化为大小约为 30 秒的固定持续时间,然后将数据发布回 PubSub。
pipeline
.apply("Read from Kinesis", new KinesisIORead())
.apply("Windowing", Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(WithKeys.of(DUMMY_KEY))
.apply(GroupIntoBatches.ofSize(5))
.apply(Values.create())
.apply("Map values to single object", ParDo.of(new GroupedMessage()))
.apply("Write to Pub/Sub", new PubSubWrite()));
应用程序 jar 包括:
- 梁-sdks-java-core:2.31.0
- 光束运行器-flink-1.11:2.31.0
- 梁-sdks-java-io-kafka:2.31.0