1

背景

  • 我打算使用 S3 来存储 Flink 的检查点,使用FsStateBackend. 但不知何故,我收到了以下错误。

错误

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

Flink 版本:我使用的是 Flink 1.10.0 版本。

4

1 回答 1

8

我已经找到了上述问题的解决方案,所以在这里我将其列在所需的步骤中。

脚步

  1. 我们需要在flink-conf.yaml下面列出的文件中添加一些配置。
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"


s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key

s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio) 
  1. 完成第一步后,我们需要将相应的(flink-s3-fs-hadoop-1.10.0.jarflink-s3-fs-presto-1.10.0.jar)JAR 文件从 opt 目录复制到 Flink 的 plugins 目录。

    • 例如:--> 1.复制/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar/flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar // 推荐用于 StreamingFileSink
      2.复制/flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar/flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar // 推荐用于检查点
  2. 在检查点代码中添加它

env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
  1. 完成上述所有步骤后,如果 Flink 已经在运行,请重新启动它。

笔记:

  • 如果你在 Flink中使用 both( flink-s3-fs-hadoopand ) 那么请专门使用 for和for而不是.flink-s3-fs-prestos3p://flink-s3-fs-prestos3a://flink-s3-fs-hadoops3://
  • 更多详情请点击这里
于 2020-10-06T13:15:57.703 回答