19

目标: 我们希望使用 AWS Glue 数据目录为驻留在 S3 存储桶中的 JSON 数据创建一个表,然后我们将通过 Redshift Spectrum 对其进行查询和解析。

背景: JSON 数据来自 DynamoDB Streams,嵌套很深。第一级 JSON 有一组一致的元素:Keys、NewImage、OldImage、SequenceNumber、ApproximateCreationDateTime、SizeBytes 和 EventName。唯一的变化是有些记录没有NewImage,有些没有OldImage。但是,在这第一级之下,模式变化很大。

理想情况下,我们希望使用 Glue 仅解析 JSON 的第一级,并且基本上将较低级别视为大型 STRING 对象(然后我们将根据需要使用 Redshift Spectrum 对其进行解析)。目前,我们正在将整个记录加载到 Redshift 中的单个 VARCHAR 列中,但记录接近 Redshift 中数据类型的最大大小(最大 VARCHAR 长度为 65535)。因此,我们希望在记录到达 Redshift 之前执行第一级解析。

到目前为止我们尝试/参考的内容:

  • 将 AWS Glue Crawler 指向 S3 存储桶会导致数百个表具有一致的顶级架构(上面列出的属性),但在 STRUCT 元素的更深层次上会出现不同的架构。我们还没有找到一种方法来创建从所有这些表中读取并将其加载到单个表中的 Glue ETL 作业。
  • 手动创建表并没有什么成果。我们尝试将每一列设置为 STRING 数据类型,但该作业未能成功加载数据(大概是因为这将涉及从 STRUCT 到 STRING 的一些转换)。将列设置为 STRUCT 时,它需要一个定义的模式 - 但这正是从一条记录到另一条记录的不同,因此我们无法提供适用于所有相关记录的通用 STRUCT 模式。
  • AWS Glue Relationalize 转换很有趣,但不是我们在这种情况下要寻找的(因为我们希望保持部分 JSON 完整,而不是完全压平它)。几周前Redshift Spectrum 支持标量 JSON数据,但这不适用于我们正在处理的嵌套 JSON。这些似乎都无法帮助处理由 Glue Crawler 创建的数百个表格。

问题: 我们如何使用 Glue(或其他方法)来允许我们仅解析这些记录的第一级 - 同时忽略顶层元素下方的不同模式 - 以便我们可以从 Spectrum 访问它或加载它身体进入红移?

我是胶水新手。我在 Glue 文档中花了很多时间,并在论坛上浏览(有些稀疏的)信息。我可能会遗漏一些明显的东西 - 或者这可能是 Glue 在其当前形式中的一个限制。欢迎任何建议。

谢谢!

4

5 回答 5

2

我不确定您是否可以使用表定义来完成此操作,但您可以通过使用映射函数将顶级值转换为 JSON 字符串来使用 ETL 作业来完成此操作。文档:[链接]

import json

# Your mapping function
def flatten(rec):
    for key in rec:
        rec[key] = json.dumps(rec[key])
    return rec

old_df = glueContext.create_dynamic_frame.from_options(
    's3',
    {"paths": ['s3://...']},
    "json")

# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)

从这里您可以选择导出到 S3(可能是 Parquet 或其他一些列格式以优化查询)或根据我的理解直接导出到 Redshift,尽管我没有尝试过。

于 2018-03-26T06:25:16.687 回答
1

截至 2018 年 12 月 20 日,我能够手动将具有第一级 json 字段的表定义为类型为 STRING 的列。然后在胶水脚本中,动态框架将列作为字符串。从那里,您可以对字段Unbox执行类型操作。json这将 json 解析字段并派生真正的模式。如果您可以遍历模式列表,则结合Unbox使用Filter允许您循环并处理来自同一输入的异构 json 模式。

但是,请注意,这非常慢。我认为胶水在循环的每次迭代期间都从 s3 下载源文件。我一直在尝试找到一种方法来保存初始源数据,但它看起来像是.toDF派生了字符串 json 字段的架构,即使您将它们指定为胶水 StringType。如果我能找到性能更好的解决方案,我会在这里添加评论。

于 2018-12-20T16:52:11.613 回答
1

我发现对浅嵌套 json 有用的过程:

  1. ApplyMapping 第一级为datasource0;

  2. 在需要的地方爆炸structarray消除元素级别的对象 ;df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)explodefrom pyspark.sql.functions import explode

  3. 选择您希望保持完整的 JSON 对象intact_json = df1.select(id, itct1, itct2,..., itctm)

  4. 转换df1回 dynamicFrame 并对 dynamicFrame 进行关系化,并将完整的列删除dataframe.drop_fields(itct1, itct2,..., itctm)

  5. 基于“id”列将关系化表与完整表连接起来。

于 2018-10-23T17:02:24.667 回答
0

到目前为止,这是 Glue 的限制。你看过胶水分类器吗?这是我唯一还没用过的,但可能适合你的需要。您可以为字段或类似内容定义 JSON 路径。

除此之外 - 胶水工作是要走的路。后台是 Spark,所以你几乎可以做任何事情。设置一个开发端点并使用它。在过去的三周里,我遇到了各种障碍,并决定完全放弃任何和所有 Glue 功能,只放弃 Spark,这样它既便携又实际工作。

在设置开发端点时可能需要记住的一件事是 IAM 角色必须具有“/”路径,因此您很可能需要手动创建一个具有此路径的单独角色。自动创建的路径为“/service-role/”。

于 2018-03-24T14:53:53.437 回答
0

你应该添加一个胶水分类器,最好是 $[*]

当你在 s3 中爬取 json 文件时,它会读取文件的第一行。

您可以创建一个粘合作业,以便将此 json 文件的数据目录表加载到 redshift 中。

我唯一的问题是 Redshift Spectrum 在读取数据目录中的 json 表时遇到问题。

让我知道您是否找到了解决方案

于 2018-06-05T07:28:00.330 回答