在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹。
然后定期运行 Spark 作业,将这个“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。
在此合并逻辑期间:
1)我们将“增量数据”加载为 DataFrame df1
2) 将当前的“快照表”加载为 DataFrame df2
3) 合并 df1 和 df2 去重 id 并获取最新版本的行(使用 update_timestamp 列)
此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,这取决于数据库可能非常巨大。
我注意到在 Delta Lake 中,使用以下代码完成了类似的操作:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。
我的问题:
1)合并/更新插入如何在内部工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中?
2)如果不是,它是否应用类似于 Apache Hudi 的增量更改?
3) 在重复数据删除期间,这个 upsert 逻辑如何知道获取最新版本的记录?因为我没有看到任何指定“更新时间戳”列的设置?