0

我必须处理一些每天收到的文件。信息有主键(date,client_id,operation_id)。所以我创建了一个流,它只将新数据附加到一个增量表中:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

这工作正常,但我需要总结这些信息分组(date,client_id),所以我创建了另一个从这个操作表到一个新表的流:

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')

summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)

summarized.repartition('date')\
        .writeStream\
        .outputMode('complete')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

这是有效的,但每次我将新数据放入operations表中时,sparksummarized都会重新计算。我尝试在第二个流媒体上使用附加模式,但它需要水印,并且日期是 DateType。

有一种方法可以仅根据组键计算新聚合并将它们附加到summarized?

4

1 回答 1

0

您需要使用Spark Structured Streaming - Window Operations

当您使用窗口操作时,它将根据windowDuration和进行分桶slideDurationwindowDuration告诉您窗口的长度是多少,并slideDuration告诉您应该滑动窗口多长时间。

如果您使用window() [docs]进行分组,您将获得一个结果window列以及您分组的其他列,例如client_id

例如:

windowDuration = "10 minutes"
slideDuration = "5 minutes"
summarized = before_summary.groupBy(before_summary.client_id,
    window(before_summary.date, windowDuration, slideDuration)
).agg(<a lot of aggs>).orderBy('window')
于 2019-09-25T15:19:35.297 回答