我目前正在探索由 databricks 开源的 delta Lake。我正在使用 delta Lake 格式读取 kafka 数据并以流形式写入。Delta Lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得这些文件是心脏 hdfs 文件系统。
我曾尝试将多个文件压缩为单个文件。
val spark = SparkSession.builder
.master("local")
.appName("spark session example")
.getOrCreate()
val df = spark.read.parquet("deltalakefile/data/")
df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
df.show()
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
DeltaTable.forPath("deltalakefile/data/").vacuum(1)
但是当我检查输出时,它正在创建新文件而不是删除任何现有文件。
有没有办法做到这一点。还有这里的保留期是什么关系?使用的时候我们应该如何在HDFS中配置呢?当我想构建具有 delta Lake 格式的原始/青铜层并且我想长期保存我的所有数据(本地数年/云上无限时间)时,我的保留配置应该是什么?