2

我在位置“/mnt/events-bronze”有一个青铜级三角洲湖表(events_bronze),数据从kafka流式传输到该表。现在我希望能够从该表中流式传输并使用“foreachBatch”更新到银表(events_silver”。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于 events_silver 不存在,我不断收到错误说 Delta 表不存在,这很明显。那么我该如何创建与 events_bronze 具有相同结构的 events_silver?我找不到 DDL 来做同样的事情。

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
    .merge(
      microBatchOutputDF.as("bronze"),
      "silver.id=bronze.id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
 events_bronze
      .writeStream
      .trigger(Trigger.ProcessingTime("120 seconds"))
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()

在初始运行期间,问题是没有为路径“/mnt/events-silver”定义增量湖表。我不确定如何在第一次运行时创建与“/mnt/events-bronze”相同的结构。

4

4 回答 4

7

在开始流写入/合并之前,检查表是否已经存在。如果不使用空数据框和架构(events_bronze)创建一个

  val exists = DeltaTable.isDeltaTable("/mnt/events-silver")

  if (!exists) {
    val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], <schema of events_bronze>)
    emptyDF
      .write
      .format("delta")
      .mode(SaveMode.Overwrite)
      .save("/mnt/events-silver")
  }

表(delta 湖元数据)将在开始时仅创建一次,如果它不存在。在作业重新启动的情况下,它将存在并跳过表创建

于 2020-06-18T18:48:11.600 回答
4

从 Delta Lake 的 1.0.0 版开始,添加了DeltaTable.createIfNotExists()方法(Evolving API)。

在您的示例DeltaTable.forPath(spark, "/mnt/events-silver")中,可以替换为:

DeltaTable.createIfNotExists(spark)
  .location("/mnt/events-silver")
  .addColumns(microBatchOutputDF.schema)
  .execute

您必须注意不要提供.option("checkpointLocation", "/mnt/events-silver/_checkpoint")checkpointLocation 是 DeltaTable 位置中的子目录的位置。这将导致在 DeltaTable 之前创建 _checkpoint 目录,并且在尝试创建 DeltaTable 时将引发异常。

于 2021-08-12T21:17:14.533 回答
1

这是一个 pyspark 示例:

from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from delta.tables import DeltaTable

basePath = 'abfss://stage2@your_storage_account_name.dfs.core.windows.net'
schema = StructType([StructField('SignalType', StringType()),StructField('StartTime', TimestampType())])

if not DeltaTable.isDeltaTable(spark, basePath + '/tutorial_01/test1'):
    emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
    emptyDF.write.format('delta').mode('overwrite').save(basePath + '/tutorial_01/test1')
于 2021-07-21T18:53:33.567 回答
0

您可以使用 spark SQL 检查表。首先在 spark SQL 上运行,它将给出青铜表的表定义:

spark.sql("show create table event_bronze").show

获得 DDL 后,只需将位置更改为银表的路径并运行该语句就是 spark SQL。

注意:使用“如果不存在则创建表......”因为它不会在并发运行中失败。

于 2019-10-04T22:31:08.477 回答