11

有人成功使用 Apache Flink 0.9 处理存储在 AWS S3 上的数据吗?我发现他们使用的是自己的 S3FileSystem 而不是来自 Hadoop 的一个......而且看起来它不起作用。我将以下路径放入 s3://bucket.s3.amazonaws.com/folder 失败,但出现以下异常:

java.io.IOException:无法建立与 Amazon S3 的连接:com.amazonaws.services.s3.model.AmazonS3Exception:我们计算的请求签名与您提供的签名不匹配。检查您的密钥和签名方法。(服务:Amazon S3;状态码:403;

4

2 回答 2

9

2016 年 5 月更新:Flink 文档现在有一个关于如何将 Flink 与 AWS 结合使用的页面


Flink 用户邮件列表上也有人问过这个问题,我已经在那里回答了:http: //apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3-数据与 Apache-Flink-td3046.html

tl;博士:

Flink 程序

public class S3FileSystem {
   public static void main(String[] args) throws Exception {
      ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
      DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
      myLines.print();
   }
}

将以下内容添加到core-site.xml并使其可用于 Flink:

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>putKeyHere</value>
</property>

<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>putSecretHere</value>
</property>
<property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
于 2015-10-06T02:15:59.020 回答
-1

您可以从 CloudFormation 模板的输出部分中指定的 S3 存储桶中检索工件。即Flink运行时启动运行后,可以将打车流处理器程序提交给Flink运行时,开始对Amazon Kinesis流中的行程事件进行实时分析。

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

上述两个命令都使用 Amazon 的 S3 作为源,您必须相应地指定工件名称。

注意:您可以点击下面的链接并使用 EMR 和 S3 存储桶创建管道。

https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/

于 2017-07-25T09:45:44.193 回答