4

我在 AWS 中设置了 ETL 管道,如下所示

input_rawdata -> s3 -> lambda -> 触发 spark etl 脚本(通过 aws 胶水)-> 输出(s3,parquet 文件)

我的问题是假设以上是数据的初始加载,我如何设置运行每天(或每小时)来的增量批处理,以添加新行或更新现有记录

a.) 我如何继续附加到相同的 s3 parquet 文件。以便后续的 presto db 查询产生最新的数据。

b.) 如何处理重复记录获取查询的最新时间戳。

在 spark 脚本中,我是否需要创建源为 s3 的 Hive 外部表并在 presto db 中使用?

感谢您的任何投入。

4

5 回答 5

2

Apache Hudi would be a great tool for that: https://hudi.incubator.apache.org/ You can do upserts to tables with data stored in S3 in Parquet format and Presto is compatible with it. For example with EMR 5.28 Hudi is installed already and you can query Hudi datasets with Hive, Spark and Presto.

于 2019-12-21T21:25:27.817 回答
1

您可以在 ETL 作业中定义作业书签。

书签会跟踪 s3 文件的处理,因此一旦处理了历史负载,并且如果您将新文件转储到 s3 上,则只有新文件将由 etl 作业处理并将这些文件标记为内部处理。

您可以通过这种方式处理增量数据。

于 2017-11-21T14:34:46.053 回答
0

首先不要尝试追加到 s3 中存在的文件,而是创建具有多条记录的文件。

要查询 s3 进行分析,您可以使用AWS Athena描述数据,其中其数据目录与 Hive 元存储兼容。

要删除重复项,您可以通过 Athena 编写类似 SQL 的查询来查询唯一记录集。

于 2017-09-06T09:32:04.827 回答
0

您现在可以使用Delta使用 spark 对您的数据进行更新插入、附加和增量,请参阅此。该工具可让您以“增量”格式(Spark + 元数据文件)写入数据。您甚至可以将数据恢复或查询到某个时间点。请注意,它最近没有完全与 Athena/Presto(开源)一起工作,因为您需要创建清单(但正在修复中)。

于 2019-12-30T16:19:06.223 回答
0

由于您已经在使用 Lambda 和 Glue,您可以使用 Kinesis 和 KCL 将数据捕获为流,或者使用 Spark Streaming,因为您有一个 spark 脚本。这些选项中的任何一个都会为您提供您正在寻找的增量输入。使用实时流式传输到现有数据库时,数据损坏的风险较小。

然后,您可以使用 Glue 通过您的 ETL 流程传递数据。您可以在 Glue 中安排或链接您的 ETL 作业,它可以将转换后的数据加载到 AWS 存储桶中。Glue 是面向批处理的,但最小间隔为 5 分钟,并且通过 Kinesis 执行第一步,然后将完成的数据传递给 Glue,您仍然可以进行增量更新。您可以查看此内容以获取有关ETL 架构的其他资源和想法。

对于任何重复数据,如有必要,您可以对已完成的数据集运行 SQL 查询。

于 2017-11-27T09:07:14.667 回答