0

我正在使用 Kafka (Integration guide)的结构化流源,如上所述,它不会提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存储在哪里?如果不提交偏移量,是否有任何方法可以监控火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每条记录附带的偏移量不适合用例) ?

干杯

4

3 回答 3

5

kafka 的结构化流式处理将偏移量保存到结构下方的 HDFS。

示例 checkpointLocation 设置如下。

.writeStream.
.....
  option("checkpointLocation", "/tmp/checkPoint")
.....

在这种情况下,kafka 的结构化流式传输保存在路径下方

/tmp/checkPoint/offsets/$'batchid'

保存的文件包含以下格式。

v1
{"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}

例如。

v1
{"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}

所以,我认为对于监控偏移滞后,它需要开发具有以下功能的自定义工具。

  • 从 HDFS 的偏移量中读取。
  • 将偏移量写入 Kafka __offset 主题。

这样,已经存在的偏移滞后监控工具可以监控结构化流中的 kafka 的偏移滞后。

于 2017-09-18T09:06:57.430 回答
0

方法 1: 如果您已配置checkpointLocation(HDFS/S3 等),请转到路径,您将找到两个目录offsetscommits. 偏移量保存当前偏移量,而提交具有最后提交的偏移量。您可以导航到提交目录并打开最新修改的文​​件,您可以在其中找到最后提交的偏移量。而 offsets 目录中的最新文件包含消耗的偏移量信息。

方法二: 也可以通过如下配置进行监控:

class CustomStreamingQueryListener extends StreamingQueryListener with AppLogging {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    logDebug(s"Started query with id : ${event.id}," +
      s" name: ${event.name},runId : ${event.runId}")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    val progress = event.progress
    logDebug(s"Streaming query made progress: ${progress.prettyJson}")
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    logDebug(s"Stream exited due to exception : ${event.exception},id : ${event.id}, " +
      s"runId: ${event.runId}")
  }

}

并将其添加到您的流配置中。

spark.streams.addListener(new CustomStreamingQueryListener())
于 2018-07-27T17:07:25.180 回答
-1

有几点需要注意:

监控:现成的监控可以在 Spark 作业的 Streaming 选项卡中找到。您可以看到正在处理的当前批次是什么,以及有多少队列正在检查滞后。

检查主题的最大和最小偏移量:您有 cli 来检查这些。可以从存在 kafka 代理的服务器中使用以下内容:

kafka-run-class \
kafka.tools.GetOffsetShell \
--broker-list your_broker1:port,your_broker2:port,your_broker3:port \
--topic your_topic \
--time -2

如果您与 Grafana 集成,可以获得更详细的信息

于 2017-04-29T00:38:57.733 回答