0

我正在使用sarama-clusterlib 在后端服务中创建一个 kafka 组消费者。这个来自godoc的示例代码有效:

for {
    if msg, ok := <-consumer.Messages(); ok {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}

因为它是一个死循环,所以我把它放在一个 goroutine 中以避免阻塞其他活动,那么它就不能再消费任何消息:

go func() {
    for msg := range consumer.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}()

(服务正在运行,所以这个 goroutine 没有被终止。它只是消费失败)

知道如何解决这个问题吗?

4

1 回答 1

0

sarama-cluster已在相当长的一段时间内处于无人维护状态,并发出以下通知

请注意,由于https://github.com/Shopify/sarama/pull/1099已合并并发布 (>= v1.19.0),此库已正式弃用。本机实现支持此库无法提供的各种用例。

我建议您改用github.com/Shopify/sarama。它具有所有功能,sarama-cluster并且得到积极维护。

您可以从他们的存储库中遵循一个简单的消费者组示例。

于 2020-04-16T13:21:59.983 回答