1

用例:

  • 生产者:在主题上写给 Kafkadb.inventory.customers
  • ConsumerGroup1 (cg1):读取db.inventory.customers和写入loader-b.inventory.customers
  • ConsumerGroup2 (cg2):读取loader-b.inventory.customers和写入 Github。

监控滞后并做一些工作

我们监控 cg1 滞后和 cg2 滞后。当延迟在0 <= lag <= 100两个消费者组的范围内时,我们执行一些任务

问题

问题是对于吞吐量低的加载器主题,cg2 消失了,所以我们不知道它的滞后并将其视为 -1。我们的条件从未得到满足,我们被困住了。

现在,如果我们考虑 0 <= lag <= 100cg1 和-1 <= lag <= 100cg2的条件

然后,在没有创建 cg2 的第一次运行中,它会考虑满足条件。但我们不希望这样。我们希望它做一些工作,然后滞后应该达到条件。

我应该怎么办?

代码

func (t *kafkaWatch) consumerGroupLag(
    id string,
    topic string,
    partition int32,
    broker *sarama.Broker,
) (
    int64,
    error,
) {
    defaultLag := int64(-1)

    lastOffset, err := t.client.GetOffset(topic, partition, sarama.OffsetNewest)
    if err != nil {
        return defaultLag, fmt.Errorf("Error getting offset for topic partition: %s, err: %v", topic, err)
    }

    offsetFetchRequest := sarama.OffsetFetchRequest{
        ConsumerGroup: id,
        Version:       1,
    }
    offsetFetchRequest.AddPartition(topic, partition)

    err = broker.Open(t.client.Config())
    if err != nil && err != sarama.ErrAlreadyConnected {
        return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err)
    }

    offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
    if err != nil {
        return defaultLag, fmt.Errorf(
            "Error fetching offset for offsetFetchRequest: %s %v, err: %v",
            topic, offsetFetchRequest, err)
    }
    if offsetFetchResponse == nil {
        return defaultLag, fmt.Errorf(
            "OffsetFetch request got no response for request: %+v",
            offsetFetchRequest)
    }

    for topicInResponse, partitions := range offsetFetchResponse.Blocks {
        if topicInResponse != topic {
            continue
        }

        for partitionInResponse, offsetFetchResponseBlock := range partitions {
            if partition != partitionInResponse {
                continue
            }
            // Kafka will return -1 if there is no offset associated
            // with a topic-partition under that consumer group
            if offsetFetchResponseBlock.Offset == -1 {
                klog.V(4).Infof("%s not consumed by group: %v", topic, id)
                return defaultLag, nil
            }
            if offsetFetchResponseBlock.Err != sarama.ErrNoError {
                return defaultLag, fmt.Errorf(
                    "Error since offsetFetchResponseBlock.Err != sarama.ErrNoError for offsetFetchResponseBlock.Err: %+v",
                    offsetFetchResponseBlock.Err)
            }
            return lastOffset - offsetFetchResponseBlock.Offset, nil
        }
    }

    klog.Warningf("%s for group is not active or present in Kafka", topic)
    return defaultLag, nil
}
4

0 回答 0