1

我必须处理一些每天收到的文件。该信息具有主键(日期、client_id、operation_id)。所以我创建了一个流,它只将新数据附加到一个增量表中:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

这工作正常,但我需要总结按(日期,client_id)分组的信息,所以我创建了另一个从这个操作表到新表的流。所以我尝试将我的date字段转换为时间戳,所以我可以在编写聚合流时使用附加模式:

import pyspark.sql.functions as F

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date'))
summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>)

summarized\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .trigger(once=True)\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

此代码运行,但它不会在接收器中写入任何内容。

为什么它不将结果写入接收器?

4

1 回答 1

0

这里可能有两个问题。

格式错误的日期输入

我很确定问题在于F.to_timestamp('date')输入null格式错误。

如果是这样,withWatermark('timestamp_date','1 second')则永远无法“物化”并且不会触发任何输出。

你能spark.read.format('delta').load('/mnt/sandbox/operations')read不要readStream)看看转换是否给出了正确的值吗?

spark.\
  read.\ 
  format('delta').\
  load('/mnt/sandbox/operations').\
  withColumn('timestamp_date',F.to_timestamp('date')).\
  show

所有行使用相同的时间戳

也有可能withWatermark('timestamp_date','1 second')没有完成(因此“完成”聚合),因为所有行都来自相同的时间戳,因此时间不会提前。

您应该有具有不同时间戳的行,以便每个 timestamp_date 的时间概念可以超过'1 second'延迟窗口。

于 2019-09-27T18:03:28.200 回答