用例:
- 生产者:在主题上写给 Kafka
db.inventory.customers
- ConsumerGroup1 (cg1):读取
db.inventory.customers
和写入loader-b.inventory.customers
- ConsumerGroup2 (cg2):读取
loader-b.inventory.customers
和写入 Github。
监控滞后并做一些工作
我们监控 cg1 滞后和 cg2 滞后。当延迟在0 <= lag <= 100
两个消费者组的范围内时,我们执行一些任务。
问题
问题是对于吞吐量低的加载器主题,cg2 消失了,所以我们不知道它的滞后并将其视为 -1。我们的条件从未得到满足,我们被困住了。
现在,如果我们考虑
0 <= lag <= 100
cg1 和-1 <= lag <= 100
cg2的条件
然后,在没有创建 cg2 的第一次运行中,它会考虑满足条件。但我们不希望这样。我们希望它做一些工作,然后滞后应该达到条件。
我应该怎么办?
代码
func (t *kafkaWatch) consumerGroupLag(
id string,
topic string,
partition int32,
broker *sarama.Broker,
) (
int64,
error,
) {
defaultLag := int64(-1)
lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
return defaultLag, fmt.Errorf("Error getting offset for topic partition: %s, err: %v", topic, err)
}
offsetFetchRequest := sarama.OffsetFetchRequest{
ConsumerGroup: id,
Version: 1,
}
offsetFetchRequest.AddPartition(topic, partition)
err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err)
}
offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
return defaultLag, fmt.Errorf(
"Error fetching offset for offsetFetchRequest: %s %v, err: %v",
topic, offsetFetchRequest, err)
}
if offsetFetchResponse == nil {
return defaultLag, fmt.Errorf(
"OffsetFetch request got no response for request: %+v",
offsetFetchRequest)
}
for topicInResponse, partitions := range offsetFetchResponse.Blocks {
if topicInResponse != topic {
continue
}
for partitionInResponse, offsetFetchResponseBlock := range partitions {
if partition != partitionInResponse {
continue
}
// Kafka will return -1 if there is no offset associated
// with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset == -1 {
klog.V(4).Infof("%s not consumed by group: %v", topic, id)
return defaultLag, nil
}
if offsetFetchResponseBlock.Err != sarama.ErrNoError {
return defaultLag, fmt.Errorf(
"Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",
offsetFetchResponseBlock.Err)
}
return lastOffset - offsetFetchResponseBlock.Offset, nil
}
}
klog.Warningf("%s for group is not active or present in Kafka", topic)
return defaultLag, nil
}