我有一个结构流作业,它从 Kafka 主题读取消息,然后保存到 dbfs。代码如下:
input_stream = spark.readStream \
.format("kafka") \
.options(**kafka_options) \
.load() \
.transform(create_raw_features)
# tranformation by 7 days rolling window
def transform_func(df):
window_spec = window("event_timestamp", "7 days", "1 day")
return df \
.withWatermark(eventTime="event_timestamp", delayThreshold="2 days") \
.groupBy(window_spec.alias("window"), "customer_id") \
.agg(count("*").alias("count")) \
.select("window.end", "customer_id", "count")
result = input_stream.transform(transform_func)
query = result \
.writeStream \
.format("memory") \
.queryName("test") \
.option("truncate","false").start()
我可以看到检查点工作正常。但是,没有数据输出。
spark.table("test").show(truncate=False)
显示空表。任何线索为什么?