0

我有一个流式传输作业,将数据流式传输到 databricks spark 中的 delta 湖中,并且我试图在流式传输时删除重复项,因此我的 delta 数据没有重复项。这是我到目前为止所拥有的:

inputPath = "my_input_path"

schema = StructType("some_schema")

eventsDF = (
  spark
    .readStream
    .schema(schema)
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)
    .csv(inputPath)
)

def upsertToDelta(eventsDF, batchId): 
  eventsDF.createOrReplaceTempView("updates")

  eventsDF._jdf.sparkSession().sql("""
    MERGE INTO eventsDF t
    USING updates s
    ON s.deviceId = t.deviceId
    WHEN NOT MATCHED THEN INSERT *
  """)


writePath = "my_write_path"
checkpointPath = writePath + "/_checkpoint"

deltaStreamingQuery = (eventsDF
  .writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .option("checkpointLocation", checkpointPath)
  .outputMode("append")
  .queryName("test")
  .start(writePath)
)

我收到错误:py4j.protocol.Py4JJavaError: An error occurred while calling o398.sql. : org.apache.spark.sql.AnalysisException: Table or view not found: eventsDF; line 2 pos 4

但我刚刚开始流式传输这些数据,还没有创建任何表。

4

1 回答 1

0

我发现您的代码有 2 个问题:

  1. 当您在函数 upsertToDelta 中调用 Merge 语句时。Spark 期待一个可以合并“更新” tempView 的目标表。

在代码中:

MERGE INTO eventsDF t 使用更新 s ON s.deviceId = t.deviceId WHEN NOT MATCHED THEN INSERT *

eventsDF 应该是目标表名。

  1. 上面的代码本身会将 tempView 与目标表连接起来,并在不匹配时插入到目标表中。

因此,不需要 start() 中的 writePath。.start(writePath)

请注意:如果需要,您还可以在合并代码中添加更新选项。

于 2021-05-19T19:41:12.900 回答