我有一个 Flink v1.2、3 个 JobManagers、2 个 TaskManagers 的设置。我想为后端状态和检查点以及 zookeeper storageDir 使用 S3 存储桶而不是 hdfs
fs.s3.accessKey:[accessKey]
fs.s3.secretKey:[secretKey]state.backend:文件系统
state.backend.fs.checkpointdir: s3:///[bucket]/flink-checkpoints
state.checkpoints.dir: s3:///[bucket]/external-checkpoints
高可用性:zookeeper
high-availability.zookeeper.storageDir : s3:///[桶]/recovery
在我登录的 JobManager 中,我有
2017-03-22 09:52:40,971 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,981 WARN org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:ubuntu (auth:SIMPLE) cause:java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
2017-03-22 09:52:40,981 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to run JobManager.
java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///[bucket]/recovery/blob'.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:276)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
2017-03-22 09:52:40,983 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon.
2017-03-22 09:52:40,993 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports.
我没有安装hadoop。不确定这是否需要以及是否应该如何/在哪里安装/配置?
编辑:在使用以下 hadoop xml (core-site.xml) 配置 Flink 后,我并没有真正理解 IAM 部分并且我没有使用 EMR,我自己安装了集群(在 AWS 中)以便能够更新 Flink 而无需依赖图片:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3://[ bucket ] </value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>[ Access Key ]</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>[ Secret Key ]</description>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>[ Access Key ]</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>[ Secret Key ]</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
我收到此错误:
2017-03-24 11:20:17,760 ERROR org.apache.flink.runtime.jobmanager.JobManager - Error while starting up JobManager
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:303)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:271)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:67)
at org.apache.flink.runtime.blob.BlobServer.<init>(BlobServer.java:114)
at org.apache.flink.runtime.jobmanager.JobManager$.createJobManagerComponents(JobManager.scala:2488)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2643)
at org.apache.flink.runtime.jobmanager.JobManager$.startJobManagerActors(JobManager.scala:2595)
at org.apache.flink.runtime.jobmanager.JobManager$.startActorSystemAndJobManagerActors(JobManager.scala:2242)
at org.apache.flink.runtime.jobmanager.JobManager$.liftedTree3$1(JobManager.scala:2020)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2019)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply$mcV$sp(JobManager.scala:2098)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$2.apply(JobManager.scala:2076)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2131)
at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2076)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1971)
at org.apache.flink.runtime.jobmanager.JobManager$$anon$9.call(JobManager.scala:1969)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1969)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
编辑:我的错误是我在描述字段中设置了键而不是值。