12

需要一种优雅的方式将 Delta Lake 回滚到以前的版本。

我目前的方法如下:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

但这很丑陋,因为需要重写整个数据集。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?

4

5 回答 5

8

从 Delta Lake 0.7.0 开始,您可以使用RESTORE命令回滚到 Delta Lake 表的早期版本。这是使用时间旅行来回滚表格的一种更简单的方法。

斯卡拉:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

Python:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL

RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0

restoreToTimestamp如果您更愿意以这种方式做事,也可以使用该命令。阅读文档以获取更多详细信息。

于 2020-12-01T23:43:08.057 回答
4

这是一个残酷的解决方案。这并不理想,但考虑到用分区覆盖大型数据集可能会很昂贵,这个简单的解决方案可能会有所帮助。

如果您对所需回滚时间之后的更新不是很敏感,只需删除 _delta_log 中晚于回滚时间的所有版本文件。未引用的文件可以稍后使用真空释放。

保留完整历史记录的另一种解决方案是 1) deltaTable.delete2) 将所有日志依次复制到回滚(版本号增加)到删除日志文件的末尾。这模仿了在回滚日期之前创建三角洲湖的过程。但它肯定不漂亮。

于 2019-08-26T23:22:48.470 回答
2

如果您的目标是修复错误数据并且您对更新不是很敏感,则可以更换一个时间间隔。

 df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")
于 2019-09-26T10:48:08.923 回答
1

我在 Delta 遇到过类似的问题,我在 1 个事务中调用了多个 dml 操作。例如,我需要调用合并,然后在 1 个单一事务中删除。因此,在这种情况下,它们要么必须一起成功,要么在其中任何一个失败时回滚状态。

为了解决这个问题,我在事务开始之前备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个 DML 操作都成功,则丢弃先前的稳定状态并使用 _delta_log 中提交的新状态,以防任何 dml 操作失败,则只需将 _delta_log 目录替换为您之前进行备份的稳定状态开始交易。一旦替换为稳定状态,然后只需运行真空以删除可能在事务期间创建的陈旧文件。

于 2020-05-11T02:15:16.627 回答
1

您应该使用时间旅行功能:https ://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

您以时间戳读取数据:

val inputPath = "/path/to/my/table@20190101000000000"

然后用“回滚”版本覆盖现有数据。

关于它是丑陋的,我不确定我能帮上什么忙。您可以使用分区来限制数据。或者您可以计算出哪些记录已更改并仅覆盖它们。

于 2019-08-27T15:07:18.417 回答