1

我正在尝试了解如何使用 Kafka 跟踪消息摄取。

我们现在遵循的工作流程是清除主题中的所有消息,然后我们通过代码更改重新摄取。我需要知道这些代码更改有多成功。在当前状态下,我正在使用 Kafka 工具并手动刷新消息总数,并将结果保存在 csv 中,我知道这是不可持续的长期。

您对自动获取 Kafka 主题中的消息计数有何建议?理想情况下,我想以一分钟一分钟的频率点击该主题并获得计数,以及像 1 天这样的时间窗口等。

*由于我们遇到的稳定性问题,我无法使用 KSQL。

4

2 回答 2

2

消息计数也取决于日志压缩。


例如,当日志压缩对一个主题有效时,您可能会观察到“奇怪的结果”。假设您有一个myTopic总共有 100 条消息的主题。假设您有一个现在生效的日志压缩策略,该计数可能会减少到 20 条消息,因为旧消息已被压缩。


为了获取每个分区的消息计数,您可以使用以下命令:

kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic myTopic 

结果将类似于下面的结果(假设myTopic有 3 个分区):

myTopic:2:34 
myTopic:1:33 
myTopic:0:33

或者,对于总和,您可以使用这个:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:2181 \
  --topic myTopic \
  --time -1 \
  --offsets 1 \
  | awk -F  ":" '{sum += $3} END {print sum}'

在这种情况下,您可能还会发现一些有用的Kafka 监控工具。更准确地说,CMAK(aka kafka-manager)有一个关于Summed Recent Offsets的指标

于 2020-04-21T14:02:53.837 回答
0

https://stackoverflow.com/a/63191575/163585

要获取 kafka 中的消息数量:

brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F  ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))

其中 option --time -1=> 当前最大偏移量 &--time -2是当前最小偏移量。

kafka 中的消息数,在保留期之后,消息将从主题中删除,因此偏移量!= 消息数**

于 2020-07-31T12:11:42.230 回答