我正在使用sarama-cluster
lib 在后端服务中创建一个 kafka 组消费者。这个来自godoc的示例代码有效:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
因为它是一个死循环,所以我把它放在一个 goroutine 中以避免阻塞其他活动,那么它就不能再消费任何消息:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(服务正在运行,所以这个 goroutine 没有被终止。它只是消费失败)
知道如何解决这个问题吗?