2

我使用 Spark 的结构化流传输来自 Kafka 的消息。然后聚合数据,并以附加模式写入内存接收器。但是,当我尝试查询内存时,它什么也没返回。下面是代码:

result  =    model
            .withColumn("timeStamp", col("startTimeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "5 minutes")
            .groupBy(window(col("timeStamp"), "5 minutes").alias("window"))
            .agg(
                count("*").alias("total")
            );

    // writing to memory
    StreamingQuery query = result.writeStream()
                .outputMode(OutputMode.Append())
                .queryName("datatable")
                .format("memory")
                .start();

    // query data in memory
    new Timer().scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            sparkSession.sql("SELECT * FROM datatable").show();
        }
    }, 10000, 10000);

结果总是:

|window|total|
+------+-----+
+------+-----+

如果我使用outputMode = complete,那么我可以获得聚合数据。但这不是我的选择,因为要求是使用附加模式。

代码有问题吗?谢谢,

4

1 回答 1

2

在追加模式下,

窗口聚合的输出延迟了指定的延迟阈值withWatermark()

在你的情况下,延迟是 5 分钟,我对你的输入数据一无所知,但我想你可能需要等待 5 分钟。

我建议您阅读(再次?)结构化流的文档

于 2017-02-28T20:34:17.170 回答