我必须处理一些每天收到的文件。该信息具有主键(日期、client_id、operation_id)。所以我创建了一个流,它只将新数据附加到一个增量表中:
operations\
.repartition('date')\
.writeStream\
.outputMode('append')\
.trigger(once=True)\
.option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/operations')
这工作正常,但我需要总结按(日期,client_id)分组的信息,所以我创建了另一个从这个操作表到新表的流。所以我尝试将我的date
字段转换为时间戳,所以我可以在编写聚合流时使用附加模式:
import pyspark.sql.functions as F
summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date'))
summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>)
summarized\
.repartition('date')\
.writeStream\
.outputMode('append')\
.option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
.trigger(once=True)\
.format('delta')\
.partitionBy('date')\
.start('/mnt/sandbox/summarized')
此代码运行,但它不会在接收器中写入任何内容。
为什么它不将结果写入接收器?