我在位置“/mnt/events-bronze”有一个青铜级三角洲湖表(events_bronze),数据从kafka流式传输到该表。现在我希望能够从该表中流式传输并使用“foreachBatch”更新到银表(events_silver”。这可以使用青铜表作为源来实现。但是,在初始运行期间,由于 events_silver 不存在,我不断收到错误说 Delta 表不存在,这很明显。那么我该如何创建与 events_bronze 具有相同结构的 events_silver?我找不到 DDL 来做同样的事情。
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
在初始运行期间,问题是没有为路径“/mnt/events-silver”定义增量湖表。我不确定如何在第一次运行时创建与“/mnt/events-bronze”相同的结构。