我已经找到了上述问题的解决方案,所以在这里我将其列在所需的步骤中。
脚步
- 我们需要在
flink-conf.yaml
下面列出的文件中添加一些配置。
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key
s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio)
完成第一步后,我们需要将相应的(flink-s3-fs-hadoop-1.10.0.jar
和flink-s3-fs-presto-1.10.0.jar
)JAR 文件从 opt 目录复制到 Flink 的 plugins 目录。
- 例如:--> 1.复制
/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar
到/flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar
// 推荐用于 StreamingFileSink
2.复制/flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar
到/flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar
// 推荐用于检查点
在检查点代码中添加它
env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
- 完成上述所有步骤后,如果 Flink 已经在运行,请重新启动它。
笔记:
- 如果你在 Flink中使用 both(
flink-s3-fs-hadoop
and ) 那么请专门使用 for和for而不是.flink-s3-fs-presto
s3p://
flink-s3-fs-presto
s3a://
flink-s3-fs-hadoop
s3://
- 更多详情请点击这里。