kafka 的结构化流式处理将偏移量保存到结构下方的 HDFS。
示例 checkpointLocation 设置如下。
.writeStream.
.....
option("checkpointLocation", "/tmp/checkPoint")
.....
在这种情况下,kafka 的结构化流式传输保存在路径下方
/tmp/checkPoint/offsets/$'batchid'
保存的文件包含以下格式。
v1
{"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}
例如。
v1
{"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}
所以,我认为对于监控偏移滞后,它需要开发具有以下功能的自定义工具。
- 从 HDFS 的偏移量中读取。
- 将偏移量写入 Kafka __offset 主题。
这样,已经存在的偏移滞后监控工具可以监控结构化流中的 kafka 的偏移滞后。