1

我有一个主键为多列的表,所以我需要在多列上执行合并逻辑


DeltaTable.forPath(spark, "path")
  .as("data")
  .merge(
    finalDf1.as("updates"),
    "data.column1 = updates.column1 AND data.column2 = updates.column2 AND data.column3 = updates.column3 AND data.column4 = updates.column4 AND data.column5 = updates.column5")
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()

当我检查数据计数时,它没有按预期更新。

有人可以在这里帮助我吗?

4

2 回答 2

0

请尝试像此示例中的方法:https ://docs.databricks.com/_static/notebooks/merge-in-cdc.html 创建一个包含您将注意的附加列的更改表

  • 如果一行是新的(被插入)
  • 旧的(主键存在)并且没有任何改变
  • 旧的(存在主键)但其他字段需要更新,然后在合并时使用附加条件,例如:
.whenMatched("s.new = true")
.insert()
.whenMatched("s.updated = true")
.updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
于 2021-09-10T14:22:55.577 回答
0

你怎么数你的行数?

要记住的一件事是,直接从 Delta Lake 生成的 parquet 文件中读取和计数可能会给您带来与通过 delta 表接口读取行不同的结果。请记住,delta 会保留日志并支持时间旅行,因此它确实会在行随时间变化时存储行的副本。

这是一种准确计算增量表中当前行数的方法:

deltaTable = DeltaTable.forPath(spark,<path to your delta table>)

deltaTable.toDF().count()
于 2021-10-20T18:29:06.140 回答