2

目前的设置:

  • 带有 json 文件的 S3 位置。所有文件都存储在同一位置(无日/月/年结构)。

  • Glue Crawler 读取目录表中的数据

  • Glue ETL 作业将数据转换并存储到 s3 中的 parquet 表中
  • Glue Crawler 从 s3 parquet 表中读取数据并将其存储到 Athena 查询的新表中

我想要实现的是按天(1)分区的镶木地板表和 1 天的镶木地板表在同一个文件中(2)。目前每个 json 文件都有一个 parquet 表。

我该怎么办?

值得一提的是,数据中有一个 datetime 列,但它是一个 unix 纪元时间戳。我可能需要将其转换为“年/月/日”格式,否则我假设它将再次为每个文件创建一个分区。

非常感谢你的帮助!!

4

3 回答 3

7

将 Glue 的 DynamicFrame 转换为 Spark 的 DataFrame 以添加年/月/日列并重新分区。将分区减少到一个将确保只有一个文件将写入文件夹,但可能会降低作业性能。

这是python代码:

from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime

...

df = dynamicFrameSrc.toDF()

repartitioned_with_new_columns_df = df
    .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
    .withColumn(“year”, year(col(“date_col”)))
    .withColumn(“month”, month(col(“date_col”)))
    .withColumn(“day”, dayofmonth(col(“date_col”)))
    .drop(col(“date_col”))
    .repartition(1)

dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = dyf, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://yourbucket/data”, 
        "partitionKeys": [“year”, “month”, “day”]
    }, 
    format = “parquet”, 
    transformation_ctx = "datasink"
)

请注意,from pyspark.qsl.functions import col可以给出参考错误,这不应该是这里解释的问题。

于 2019-08-10T22:36:04.290 回答
3

我无法发表评论,所以我要写一个答案。

我使用了 Yuriy 的代码和一些需要调整的地方:

  • 缺少括号

df = dynamicFrameSrc.toDF()

  • 在 toDF() 之后我必须添加,select("*")否则架构为空

df.select("*") .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))

于 2019-10-29T08:15:42.360 回答
1

要在 AWS Glue Studio 中实现此目的:

您将需要创建一个自定义函数来将日期时间字段转换为日期。有一个额外的步骤是将其转换回 DynamicFrameCollection。

在 Python 中:

def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
    glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
    return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))

然后,您必须编辑自定义转换器架构以包含您刚刚创建的新日期字段。

然后,您可以使用“数据目标”节点将数据写入磁盘,然后选择新的日期字段用作分区。

视频逐步演练

于 2020-12-24T16:50:54.500 回答