4

在我们的数据管道中,我们从数据源中提取 CDC 事件,并将这些更改以 AVRO 格式写入“增量数据”文件夹。

然后定期运行 Spark 作业,将这个“增量数据”与我们当前版本的“快照表”(ORC 格式)合并,以获得最新版本的上游快照。

在此合并逻辑期间:

1)我们将“增量数据”加载为 DataFrame df1

2) 将当前的“快照表”加载为 DataFrame df2

3) 合并 df1 和 df2 去重 id 并获取最新版本的行(使用 update_timestamp 列)

此逻辑将“增量数据”和当前“快照表”的全部数据加载到 Spark 内存中,这取决于数据库可能非常巨大。

我注意到在 Delta Lake 中,使用以下代码完成了类似的操作:

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

在这里,“updatesDF”可以被认为是来自 CDC 源的“增量数据”。

我的问题

1)合并/更新插入如何在内部工作?它是否将整个“updatedDF”和“/data/events/”加载到 Spark 内存中?

2)如果不是,它是否应用类似于 Apache Hudi 的增量更改?

3) 在重复数据删除期间,这个 upsert 逻辑如何知道获取最新版本的记录?因为我没有看到任何指定“更新时间戳”列的设置?

4

1 回答 1

3
   1) How does merge/upsert internally works? Does it load entire "updatedDF" and 
   "/data/events/" into Spark memory?

不,Spark 不需要将需要更新的整个 Delta DF 加载到内存中。否则它将无法扩展。它采用的方法与 Spark 所做的其他工作非常相似——如果数据集足够大(或者您在云中创建显式分区),则整个表会透明地分成多个分区。然后为每个分区分配一个构成您的工作的merge任务。任务可以在不同的 Spark 执行器等上运行。

   2) If not, does it apply the delta changes something similar to Apache Hudi ?

我听说过 Apache Hudi,但还没有看过。在内部,Delta看起来像版本化的镶木地板文件。对表的更改存储为有序的原子单元,称为提交。当你保存一个表时——看看它有什么文件——它会有 000000.json、000001.json 等文件,每个文件都会引用子目录中底层 parquet 文件的一组操作。例如,000000.json 会说这个版本在时间上引用了 parquet 文件 001 和 002,而 000001.json 会说这个版本在时间上不应该引用这两个旧的 parquet 文件,而只使用 parquet 文件 003。

   3) During deduplication how this upsert logic knows to take the latest version of a record? 
Because I don't see any setting to specify the "update timestamp" column?

默认情况下,它引用最新的变更集。时间戳是如何在 Delta 中实现此版本控制的内部。您可以通过AS OF语法引用较旧的快照 - 请参阅 https://docs.databricks.com/delta/delta-batch.html#syntax

于 2019-12-27T23:55:37.100 回答