0

当 ETL 作业运行时,它会正确执行,但由于表没有时间戳,因此在运行相同的 ETL 作业时会复制数据。如何使用 Upsert 执行分段并解决此问题,或者如果您有任何其他问题欢迎您回答。如何我要摆脱这个问题吗?我找到的解决方案是在其中包含时间戳或进行分期,还是有其他方法?

4

2 回答 2

0

U 可以overwrite在向 s3 写入数据时使用。它将替换原始数据

于 2019-01-30T17:53:05.307 回答
0

为了防止 s3 上的重复,您需要从目标加载数据并在保存之前过滤掉现有记录:

val deltaDf = newDataDf.alias("new")
  .join(existingDf.alias("existing"), "id", "left_outer")
  .where(col("existing.id").isNull)
  .select("new.*")

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))

但是,此方法不会覆盖更新的记录。

另一种选择是使用一些字段保存更新的记录updated_at,下游消费者可以使用这些字段来获取最新值。

您还可以考虑在每次运行作业时将数据集转储到单独的文件夹中(即,每天您都有完整的数据转储data/dataset_date=<year-month-day>

import org.apache.spark.sql.functions._

val datedDf = sourceDf.withColumn("dataset_date", current_date())

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map(
      "path" -> path,
      "partitionKeys" -> Array("dataset_date")
    )),
    transformationContext = "save_to_s3"
    format = "avro"
  ).writeDynamicFrame(DynamicFrame(datedDf, glueContext))
于 2019-01-31T14:57:11.373 回答