在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但我们如何获得主题的最新偏移量来计算延迟。
问问题
1111 次
1 回答
0
您可以使用 Kafka REST 代理来获取为特定分区提交的最新偏移量。根据Confluent Docs,
GET /consumers/(string: group_name)/instances/(string: instance)/offsets
获取给定分区的最后提交的偏移量(无论提交是由该进程还是其他进程发生的)。
请注意,必须向持有消费者实例的特定 REST 代理实例发出此请求。
参数:
group_name (string) -- 消费组的名称
instance (string) -- 消费者实例请求 JSON 的 ID
对象数组:
- partitions -- 用于查找最后提交的偏移量的分区列表
- partitions[i].topic (string) -- 主题名称
- partitions[i].partition (int) -- 分区 ID
响应 JSON 对象数组:
- offsets -- 提交的偏移量列表
- offsets[i].topic (string) -- 提交偏移量的主题名称
- offsets[i].partition (int) -- 已提交偏移量的分区 ID
- offsets[i].offset (int) -- 提交的偏移量
- offsets[i].metadata (string) -- 已提交偏移量的元数据
状态码:
- 404 未找到 --
- 错误代码 40402 - 未找到分区
- 错误代码 40403 - 未找到使用者实例
示例请求:
GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
{
"partitions": [
{
"topic": "test",
"partition": 0
},
{
"topic": "test",
"partition": 1
}
]
}
示例响应:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{"offsets":
[
{
"topic": "test",
"partition": 0,
"offset": 21,
"metadata":""
},
{
"topic": "test",
"partition": 1,
"offset": 31,
"metadata":""
}
]
}
于 2019-07-11T13:35:34.460 回答