4

我目前正在探索由 databricks 开源的 delta Lake。我正在使用 delta Lake 格式读取 kafka 数据并以流形式写入。Delta Lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得这些文件是心脏 hdfs 文件系统。

我曾尝试将多个文件压缩为单个文件。

val spark =  SparkSession.builder
    .master("local")
    .appName("spark session example")
    .getOrCreate()

  val df = spark.read.parquet("deltalakefile/data/")

  df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
  df.show()

  spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")

  DeltaTable.forPath("deltalakefile/data/").vacuum(1)

但是当我检查输出时,它正在创建新文件而不是删除任何现有文件。

有没有办法做到这一点。还有这里的保留期是什么关系?使用的时候我们应该如何在HDFS中配置呢?当我想构建具有 delta Lake 格式的原始/青铜层并且我想长期保存我的所有数据(本地数年/云上无限时间)时,我的保留配置应该是什么?

4

1 回答 1

5

按照设计,Delta 不会立即删除文件以防止活跃的消费者受到影响。它还提供版本控制(又称时间旅行),因此您可以在必要时查看历史记录。要删除以前的版本或未提交的文件,您需要运行Vacuum

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum() // use default retention period

关于如何管理青铜/银/金模型的保留和压实的问题,您应该将登陆表(又名青铜)视为仅附加日志。这意味着您不需要在事后执行压缩或任何重写。青铜表应该是您从上游数据源(例如 Kafka)摄取的数据的记录,并且应用了最少的处理。

青铜表通常用作增量流源来填充下游数据集。鉴于从 Delta 读取是从事务日志中完成的,与使用执行慢速文件列表的标准文件读取器相比,小文件不是这样的问题。

但是,当您将文件写入青铜表时,仍然有一些选项可以优化文件:1)通过首先重新分区以减少文件数量,在写入 Delta 时压缩您的 Kafka 消息,2)增加触发间隔,所以摄取的运行频率较低,并且正在将更多消息写入更大的文件。

于 2019-10-18T02:44:23.297 回答