1

我有一个笔记本,我正在使用它来加载历史记录。每次加载 6 个月的数据,从2018-10-01. 我的增量文件按 calendar_date 分区

初始加载后,我能够读取增量文件并查看数据就好了。

但是在第二次加载 date 之后2019-01-01 to 2019-06-30,以前的分区没有使用 delta 格式正常加载。

像这样读取我的源增量文件会引发我的错误说

file dosen't exist

game_refined_start = (
    spark.read.format("delta").load("s3://game_events/refined/game_session_start/calendar_date=2018-10-04/")
)

然而,像下面这样的阅读就可以很好地知道可能出了什么问题

spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")
game_refined_start = (
    spark.read.format("parquet").load("s3://game_events/refined/game_session_start/calendar_date=2018-10-04/")
)
4

1 回答 1

0

如果使用覆盖模式,则它会完全替换以前的数据。您通过 parquet 看到旧数据,因为 delta 不会立即删除旧版本(但如果您使用 parquet,那么它将立即删除数据)。

要解决您的问题 - 您需要使用附加模式。如果您需要获取以前的数据,您可以从表中读取特定版本,并附加它。像这样的东西:

path = "s3://game_events/refined/game_session_start/"
v = spark.sql(f"DESCRIBE HISTORY delta.`{path}` limit 2")
version = v.take(2)[1][0]
df = spark.read.format("delta").option("versionAsOf", version).load(path)
df.write.mode("append").save(path)
于 2021-03-07T14:44:58.300 回答