我使用 Spark 2.2.0-rc1。
我有一个 Kafka ,我正在查询一个正在运行的带有topic
水印的聚合,带有1 minute
水印,输出模式。console
append
import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest").
option("subscribe", "topic").
load.
select(from_json(col("value").cast("string"), schema).as("value"))
select("value.*").
withWatermark("time", "1 minute").
groupBy("time").
count.
writeStream.
outputMode("append").
format("console").
start
我在 Kafka 中推送以下数据topic
:
{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
我得到以下输出:
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
这是预期的行为吗?