我正在使用带有 Azure Databricks Delta 的 Spark Structured Streaming,我在其中写入 Delta 表(增量表名称是原始的)。我正在从 Azure 文件中读取我收到乱序数据的地方,并且其中有 2 列“ smtUidNr
”和“ msgTs
“。我正在尝试通过在我的代码中使用 Upsert 来处理重复项,但是当我查询我的增量表“ raw
”时。我在增量表中看到以下重复记录
smtUidNr msgTs
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
以下是我的代码:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
// merge duplicates
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO raw t
USING updates s
ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
val df=spark.readStream.format("delta").load("abfss://abc@hjklinfo.dfs.core.windows.net/entrypacket/")
df.createOrReplaceTempView("table1")
val entrypacket_DF=spark.sql("""SELECT details as dcl,invdetails as inv,eventdetails as evt,smtdetails as smt,msgHdr.msgTs,msgHdr.msgInfSrcCd FROM table1 LATERAL VIEW explode(dcl) dcl AS details LATERAL VIEW explode(inv) inv AS invdetails LATERAL VIEW explode(evt) evt as eventdetails LATERAL VIEW explode(smt) smt as smtdetails""").dropDuplicates()
entrypacket_DF.createOrReplaceTempView("ucdx")
//Here, we are adding a column date_timestamp which converts msgTs timestamp to YYYYMMDD format in column date_timestamp which eliminates duplicate for today & then we drop this column meaning which we are not tampering with msgTs column
val resultDF=spark.sql("select dcl.smtUidNr,dcl,inv,evt,smt,cast(msgTs as timestamp)msgTs,msgInfSrcCd from ucdx").withColumn("date_timestamp",to_date(col("msgTs"))).dropDuplicates(Seq("smtUidNr","date_timestamp")).drop("date_timestamp")
resultDF.createOrReplaceTempView("final_tab")
val finalDF=spark.sql("select distinct smtUidNr,max(dcl) as dcl,max(inv) as inv,max(evt) as evt,max(smt) as smt,max(msgTs) as msgTs,max(msgInfSrcCd) as msgInfSrcCd from final_tab group by smtUidNr")
finalDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()
结构化流不支持聚合、窗口函数和 order by 子句?我可以做些什么来修改我的代码,以便我只能拥有特定 smtUidNr 的 1 条记录?