我有三个问题:</p>
- “最旧的偏移量”是什么意思?最旧的偏移量并不意味着偏移量 0?
// OffsetOldest 代表代理上可用的最旧偏移量
// 分区。
OffsetOldest int64 = -2
假如
A. 三个代理在同一台机器上运行
B. 消费者组只有一个消费者线程
C. 消费者配置 OffsetOldest 标志。
D. 已经产生了 100 条消息,目前消费者线程已经消耗了 90 条消息。那么如果consumer线程重启了,那么这个consumer会从哪个offset开始消费呢?是 91 还是 0?
在我们下面的代码中,似乎每次启动消费者时都会重新使用消息。但实际上它并不总是发生。为什么重新启动后只会发生几次重复消费(不是全部)?
func (this *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { this.handler(message) session.MarkMessage(message, "") } return nil } ctx := context.Background() conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Return.Errors = true consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf) if err != nil { logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err) return }