这是我的场景:
我有一个将 json 数据发布到它的 kinesis 流数据帧。
我正在尝试按 3 个流过滤此 json 数据类型,并通过这 3 个流将每种类型写入 3 个位置。
有第 4 个流,它使用第一种类型的 DF 并将 2 个静态 dfs jdbc 表连接到它,将其写入最终的 jdbc 表。
#Kinesis Streaming Dataframe
KStr_DF = spark.readStream.format("kinesis").option("initialPosition", "LATEST").load()
#3 streaming dataframes filtered
Str_DF_A = KStr_DF.where('requestData.type == "typeA"')
Str_DF_B = KStr_DF.where('requestData.type == "typeB"')
Str_DF_C = KStr_DF.where('requestData.type == "typeC"')
Str_DF_A.writeStream()\
.foreachBatch(lambda df,epochId:rows_writer(df,epochId,loc1,loc2, loc3))\
.option("checkpointLocation",fpath1).start()
Str_DF_B.writeStream()\
.foreachBatch(lambda df,epochId:rows_writer(df,epochId,loc1,loc2, loc3))\
.option("checkpointLocation",fpath2).start()
Str_DF_C.writeStream()\
.foreachBatch(lambda df,epochId:rows_writer(df,epochId,loc1,loc2, loc3))\
.option("checkpointLocation",fpath3).start()
#read static tables and join to first stream, then create 4th stream
DFstaticD=spark.read.format("jdbc").load(tableD)
DFstaticE=spark.read.format("jdbc").load(tableE) #jdbc DF placeholder
#Stream Static joins
StrStatTypeAD = Str_DF_A.join(DFstaticA,on='col1',type="left-outer")
StrStatTypeADE = StrStatTypeAD.join(DFstaticE,on='col1',type="inner")
#stream 4
StrStatTypeADE.writeStream\
.trigger(processingTime='10 seconds')\ #trigger of any use??
.foreach(foreach_rows_writer(jdbc_url))\
.option("checkpointLocation", fpath).outputMode("append").start()
def rows_writer(batchdata, epochId, location1, location2, location3):
batchdata.persist()
batchdata.write().format(loc1).mode("append").save(location1)
batchdata.write().format(loc2).mode("append").save(location2)
batchdata.write().format(loc3).mode("append").save(location3)
batchdata.unpersist()
class foreach_rows_writer:
#jdbc writes for inserts/updates to table D, left joined with Str_DF_A earlier
def __init__(self,jdbc_url):
def open(self, partition_id, epoch_id):
#if connects return True else False
def process(self, row):
#cursor update or inserts
我的问题,对于 spark 3.1.x
- 基于此,所有前 3 个流分别从 kinesis 重新轮询/读取。3.1.x 仍然如此吗?或者他们是否只通过单个流数据帧 KStr_DF 从 kinesis 读取一次?
- 第 4 个流使用流 1 使用的相同数据帧,然后将其与其他静态数据帧连接,分别执行第 1/4 次读取/轮询运动,或者仅执行一次并通过 Str_DF_A 使用相同的偏移量?
- 连接上缺少水印会导致过度的运动轮询吗?或者,如果我要在没有水印的情况下对列上的输入流运动进行重复数据删除?
- 将 trigger(processingTime='10 seconds') 与 foreach(stream 4) 一起使用有什么意义吗?