我有一个流式传输作业,将数据流式传输到 databricks spark 中的 delta 湖中,并且我试图在流式传输时删除重复项,因此我的 delta 数据没有重复项。这是我到目前为止所拥有的:
inputPath = "my_input_path"
schema = StructType("some_schema")
eventsDF = (
  spark
    .readStream
    .schema(schema)
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)
    .csv(inputPath)
)
def upsertToDelta(eventsDF, batchId): 
  eventsDF.createOrReplaceTempView("updates")
  eventsDF._jdf.sparkSession().sql("""
    MERGE INTO eventsDF t
    USING updates s
    ON s.deviceId = t.deviceId
    WHEN NOT MATCHED THEN INSERT *
  """)
writePath = "my_write_path"
checkpointPath = writePath + "/_checkpoint"
deltaStreamingQuery = (eventsDF
  .writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .option("checkpointLocation", checkpointPath)
  .outputMode("append")
  .queryName("test")
  .start(writePath)
)
我收到错误:py4j.protocol.Py4JJavaError: An error occurred while calling o398.sql.
: org.apache.spark.sql.AnalysisException: Table or view not found: eventsDF; line 2 pos 4
但我刚刚开始流式传输这些数据,还没有创建任何表。