0

我有一个结构流作业,它从 Kafka 主题读取消息,然后保存到 dbfs。代码如下:

input_stream = spark.readStream \
  .format("kafka") \
  .options(**kafka_options) \
  .load() \
  .transform(create_raw_features)

# tranformation by 7 days rolling window
def transform_func(df):
  window_spec = window("event_timestamp", "7 days", "1 day")
  return df \
          .withWatermark(eventTime="event_timestamp", delayThreshold="2 days") \
          .groupBy(window_spec.alias("window"), "customer_id") \
          .agg(count("*").alias("count")) \
          .select("window.end", "customer_id", "count")

result = input_stream.transform(transform_func)

query = result \
    .writeStream \
    .format("memory") \
    .queryName("test") \
    .option("truncate","false").start()

我可以看到检查点工作正常。但是,没有数据输出。

spark.table("test").show(truncate=False)

显示空表。任何线索为什么?

4

1 回答 1

1

我发现了这个问题。在 Spark 文档输出模式部分,它指出:

附加模式使用水印来丢弃旧的聚合状态。但是窗口聚合的输出延迟了 withWatermark() 中指定的后期阈值,因为模式语义,行在最终确定后(即越过水印之后)只能添加到结果表一次。

由于我没有明确指定输出模式,append因此是隐式应用的,这意味着只有在水印阈值通过后才会出现第一个输出。

要获得每个微批次的输出,请使用输出模式updatecomplete代替。

这现在对我有用

query = result \
    .writeStream \
    .format("memory") \
    .outputMode("update") \
    .queryName("test") \
    .option("truncate","false").start()
于 2021-08-26T16:54:22.990 回答