我使用 Spark 的结构化流传输来自 Kafka 的消息。然后聚合数据,并以附加模式写入内存接收器。但是,当我尝试查询内存时,它什么也没返回。下面是代码:
result = model
.withColumn("timeStamp", col("startTimeStamp").cast("timestamp"))
.withWatermark("timeStamp", "5 minutes")
.groupBy(window(col("timeStamp"), "5 minutes").alias("window"))
.agg(
count("*").alias("total")
);
// writing to memory
StreamingQuery query = result.writeStream()
.outputMode(OutputMode.Append())
.queryName("datatable")
.format("memory")
.start();
// query data in memory
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sparkSession.sql("SELECT * FROM datatable").show();
}
}, 10000, 10000);
结果总是:
|window|total|
+------+-----+
+------+-----+
如果我使用outputMode = complete
,那么我可以获得聚合数据。但这不是我的选择,因为要求是使用附加模式。
代码有问题吗?谢谢,