我有一个流式传输作业,将数据流式传输到 databricks spark 中的 delta 湖中,并且我试图在流式传输时删除重复项,因此我的 delta 数据没有重复项。这是我到目前为止所拥有的:
inputPath = "my_input_path"
schema = StructType("some_schema")
eventsDF = (
spark
.readStream
.schema(schema)
.option("header", "true")
.option("maxFilesPerTrigger", 1)
.csv(inputPath)
)
def upsertToDelta(eventsDF, batchId):
eventsDF.createOrReplaceTempView("updates")
eventsDF._jdf.sparkSession().sql("""
MERGE INTO eventsDF t
USING updates s
ON s.deviceId = t.deviceId
WHEN NOT MATCHED THEN INSERT *
""")
writePath = "my_write_path"
checkpointPath = writePath + "/_checkpoint"
deltaStreamingQuery = (eventsDF
.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.queryName("test")
.start(writePath)
)
我收到错误:py4j.protocol.Py4JJavaError: An error occurred while calling o398.sql.
: org.apache.spark.sql.AnalysisException: Table or view not found: eventsDF; line 2 pos 4
但我刚刚开始流式传输这些数据,还没有创建任何表。