15

我使用 Spark 2.2.0-rc1。

我有一个 Kafka ,我正在查询一个正在运行的带有topic水印的聚合,带有1 minute水印,输出模式。consoleappend

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start

我在 Kafka 中推送以下数据topic

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}

我得到以下输出:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

这是预期的行为吗?

4

2 回答 2

8

将更多数据推送到 Kafka 应该会触发 Spark 输出一些东西。当前的行为完全是由于内部实现。

当你推送一些数据时,StreamingQuery 会生成一个批处理来运行。当此批次完成时,它将记住此批次中的最大事件时间。然后在下一批中,因为您使用的是append模式,StreamingQuery 将使用最大事件时间和水印将旧值从 StateStore 中逐出并输出。因此,您需要确保至少生成两个批次才能看到输出。

于 2017-06-08T20:19:27.350 回答
5

这是我最好的猜测:

附加模式仅在水印过去后输出数据(例如在这种情况下为 1 分钟后)。您没有设置触发器(例如.trigger(Trigger.ProcessingTime("10 seconds")),因此默认情况下它会尽快输出批次。所以在第一分钟你的所有批次都应该是空的,一分钟后的第一批应该包含一些内容。

另一种可能性是您正在使用groupBy("time")而不是groupBy(window("time", "[window duration]")). 我相信水印是要与时间窗口或 mapGroupsWithState 一起使用的,所以我不是这种情况下交互的工作方式。

于 2017-08-08T22:56:52.320 回答