1

我的需要是让生产者从它崩溃前处理的最后一条消息开始。幸运的是,我只有一个主题,一个分区和一个消费者。

为此,我尝试了https://github.com/Shopify/sarama,但它似乎还不可用。我现在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。

我无法检索最后提交的偏移量我无法弄清楚如何让 sarama消费者从所述偏移量开始。到目前为止,我发现的唯一参数是Config.Producer.Offsets.Initial.

  1. 如何检索最后提交的偏移量?
  2. 如何让消费者从最后一条提交了偏移量的消息开始?OffsetNewest将使它从产生的最后一条消息开始,而不是消费者最后处理的消息。
  3. 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 这样做?

预先感谢

PS我使用的是Kafka 10.0,所以偏移量是存储在kafka而不是zookeeper中。

EDIT1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到我们找到未处理的消息。

4

1 回答 1

2

如果已为分区保存偏移量,则 sarama-cluster 将从该偏移量恢复消耗。仅当不存在保存的偏移量时才使用该Config.Producer.Offsets.Initial选项(消费者组首次运行)。

您可以通过在main()函数开头添加以下行来验证这一点:

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)

然后你会在输出中看到类似下面的内容:

cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12

其中的 12 是 Sarama 将从该分区 (sample/0) 开始的偏移量。

于 2018-01-18T13:27:13.417 回答