我正在尝试了解如何使用 Kafka 跟踪消息摄取。
我们现在遵循的工作流程是清除主题中的所有消息,然后我们通过代码更改重新摄取。我需要知道这些代码更改有多成功。在当前状态下,我正在使用 Kafka 工具并手动刷新消息总数,并将结果保存在 csv 中,我知道这是不可持续的长期。
您对自动获取 Kafka 主题中的消息计数有何建议?理想情况下,我想以一分钟一分钟的频率点击该主题并获得计数,以及像 1 天这样的时间窗口等。
*由于我们遇到的稳定性问题,我无法使用 KSQL。
我正在尝试了解如何使用 Kafka 跟踪消息摄取。
我们现在遵循的工作流程是清除主题中的所有消息,然后我们通过代码更改重新摄取。我需要知道这些代码更改有多成功。在当前状态下,我正在使用 Kafka 工具并手动刷新消息总数,并将结果保存在 csv 中,我知道这是不可持续的长期。
您对自动获取 Kafka 主题中的消息计数有何建议?理想情况下,我想以一分钟一分钟的频率点击该主题并获得计数,以及像 1 天这样的时间窗口等。
*由于我们遇到的稳定性问题,我无法使用 KSQL。
消息计数也取决于日志压缩。
例如,当日志压缩对一个主题有效时,您可能会观察到“奇怪的结果”。假设您有一个myTopic
总共有 100 条消息的主题。假设您有一个现在生效的日志压缩策略,该计数可能会减少到 20 条消息,因为旧消息已被压缩。
为了获取每个分区的消息计数,您可以使用以下命令:
kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic myTopic
结果将类似于下面的结果(假设myTopic
有 3 个分区):
myTopic:2:34
myTopic:1:33
myTopic:0:33
或者,对于总和,您可以使用这个:
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:2181 \
--topic myTopic \
--time -1 \
--offsets 1 \
| awk -F ":" '{sum += $3} END {print sum}'
在这种情况下,您可能还会发现一些有用的Kafka 监控工具。更准确地说,CMAK
(aka kafka-manager
)有一个关于Summed Recent Offsets的指标
https://stackoverflow.com/a/63191575/163585
要获取 kafka 中的消息数量:
brokers="<broker1:port>"
topic=<topic-name>
sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')
sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}')
echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
其中 option --time -1
=> 当前最大偏移量 &--time -2
是当前最小偏移量。
kafka 中的消息数,在保留期之后,消息将从主题中删除,因此偏移量!= 消息数**