我正在使用 Spark 结构化流从 Kafka 主题中读取记录;我打算计算 Spark 中每个“微批次”中收到的记录数readstream
这是一个片段:
val kafka_df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "test-count")
.load()
我从文档中了解到 kafka_df 将在 astreamingQuery
启动(接下来)时被懒惰地评估,并且在评估时,它拥有一个微批次。所以,我想先做一个主题,然后做一个应该工作groupBy
。count
像这样:
val counter = kafka_df
.groupBy("topic")
.count()
现在要评估所有这些,我们需要一个 streaminQuery,比如说,一个控制台接收器查询以在控制台上打印它。这就是我看到问题的地方。DataFrames上的流式aggregate
查询,例如kafka_df
仅适用于outputMode
complete/update而不适用于append。
这实际上意味着,streamingQuery 报告的计数是累积的。
像这样:
val counter_json = counter.toJSON //to jsonify
val count_query = counter_json
.writeStream.outputMode("update")
.format("console")
.start() // kicks of lazy evaluation
.awaitTermination()
在受控设置中,其中:
实际发布记录:1500
实际接收微批次:3
实际接收记录:1500
每个微批次的计数应该是500,所以我希望(希望)查询打印到控制台:
主题:测试
计数:500
主题:测试
计数:500
主题:测试
计数:500
但事实并非如此。它实际上打印:
主题:测试
计数:500
主题:测试
计数:1000
主题:测试
计数:1500
我理解这是因为'outputMode'完成/更新(累积)
我的问题:是否有可能准确地得到每个微批次的计数是 Spark-Kafka 结构化流式传输?
从文档中,我发现了水印方法(支持追加):
val windowedCounts = kafka_df
.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"topic")
.count()
val console_query = windowedCounts
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
但是这样的结果console_query
是不准确的,而且看起来很离谱。
TL;DR - 任何关于准确计算 Spark-Kafka 微批处理中记录的想法都将不胜感激。