我必须处理一些每天收到的文件。信息有主键(date,client_id,operation_id)
。所以我创建了一个流,它只将新数据附加到一个增量表中:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
这工作正常,但我需要总结这些信息分组(date,client_id)
,所以我创建了另一个从这个操作表到一个新表的流:
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)
summarized.repartition('date')\
.writeStream\
.outputMode('complete')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
这是有效的,但每次我将新数据放入operations
表中时,sparksummarized
都会重新计算。我尝试在第二个流媒体上使用附加模式,但它需要水印,并且日期是 DateType。
有一种方法可以仅根据组键计算新聚合并将它们附加到summarized
?