1

我有三个问题:</p>

  1. “最旧的偏移量”是什么意思?最旧的偏移量并不意味着偏移量 0?

// OffsetOldest 代表代理上可用的最旧偏移量
// 分区。
OffsetOldest int64 = -2

  1. 假如

    A. 三个代理在同一台机器上运行
    B. 消费者组只有一个消费者线程
    C. 消费者配置 OffsetOldest 标志。
    D. 已经产生了 100 条消息,目前消费者线程已经消耗了 90 条消息。

    那么如果consumer线程重启了,那么这个consumer会从哪个offset开始消费呢?是 91 还是 0?

  2. 在我们下面的代码中,似乎每次启动消费者时都会重新使用消息。但实际上它并不总是发生。为什么重新启动后只会发生几次重复消费(不是全部)?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    
4

1 回答 1

2
  1. 不会。应用保留策略时,会从主题中删除较旧的消息。因此,最早的偏移量可能不是第一个偏移量(即0)。

  2. 这取决于您的配置。本质上,您有 3 个选项:

    • earliest偏移量开始消费
    • latest偏移量开始消费
    • 从特定偏移量开始消费
  3. 你必须使用sarama.OffsetOldest. 从文档中,

 const (
        // OffsetNewest stands for the log head offset, i.e. the offset that will be
        // assigned to the next message that will be produced to the partition. You
        // can send this to a client's GetOffset method to get this offset, or when
        // calling ConsumePartition to start consuming new messages.
        OffsetNewest int64 = -1
        // OffsetOldest stands for the oldest offset available on the broker for a
        // partition. You can send this to a client's GetOffset method to get this
        // offset, or when calling ConsumePartition to start consuming from the
        // oldest offset that is still available on the broker.
        OffsetOldest int64 = -2
    )
于 2019-07-16T14:02:25.770 回答