当 ETL 作业运行时,它会正确执行,但由于表没有时间戳,因此在运行相同的 ETL 作业时会复制数据。如何使用 Upsert 执行分段并解决此问题,或者如果您有任何其他问题欢迎您回答。如何我要摆脱这个问题吗?我找到的解决方案是在其中包含时间戳或进行分期,还是有其他方法?
问问题
726 次
2 回答
0
U 可以overwrite
在向 s3 写入数据时使用。它将替换原始数据
于 2019-01-30T17:53:05.307 回答
0
为了防止 s3 上的重复,您需要从目标加载数据并在保存之前过滤掉现有记录:
val deltaDf = newDataDf.alias("new")
.join(existingDf.alias("existing"), "id", "left_outer")
.where(col("existing.id").isNull)
.select("new.*")
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(deltaDf, glueContext))
但是,此方法不会覆盖更新的记录。
另一种选择是使用一些字段保存更新的记录updated_at
,下游消费者可以使用这些字段来获取最新值。
您还可以考虑在每次运行作业时将数据集转储到单独的文件夹中(即,每天您都有完整的数据转储data/dataset_date=<year-month-day>
)
import org.apache.spark.sql.functions._
val datedDf = sourceDf.withColumn("dataset_date", current_date())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> path,
"partitionKeys" -> Array("dataset_date")
)),
transformationContext = "save_to_s3"
format = "avro"
).writeDynamicFrame(DynamicFrame(datedDf, glueContext))
于 2019-01-31T14:57:11.373 回答