6

更新:原来我在 Docker 中的端口有问题。不知道为什么解决了这种现象。

我相信我遇到了一个奇怪的错误。我正在使用Sarama库,并且能够成功创建消费者。

func main() {
 config = sarama.NewConfig()
 config.ClientID = "go-kafka-consumer"
 config.Consumer.Return.Errors = true
 // Create new consumer
 master, err := sarama.NewConsumer("localhost:9092", config)
 if err != nil {
    panic(err)
 }

 defer func() {
     if err := master.Close(); err != nil {
         panic(err)
     }
 }()

 partitionConsumer, err := master.ConsumePartition("myTopic",0, 
 sarama.OffsetOldest)
 if err != nil {
     panic(err)
 }
}

一旦我将这段代码分解并移出主程序,我就会遇到错误:

kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)

我已将代码拆分如下​​:以前的 main() 方法现在已转换为使用名为 NewConsumer() 的方法的消费者包,并且我的新 main() 调用 NewConsumer() 如下所示:

c := consumer.NewConsumer()

恐慌声明被触发sarama.NewConsumer并打印出来kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

为什么以这种方式分解我的代码会触发 Sarama 无法成为消费者?Sarama 是否需要直接从 main 运行?

4

1 回答 1

3

认为您以这种方式创建了 2 个或更多消费者,它们被分组为一个组(可能go-kafka-consumer)。您的代理有一个带有 1 个分区的主题,因此其中一个组被分配,另一个产生此错误消息。如果您将该主题的分区提高到 2,错误就会消失。但我认为你的问题是你以某种方式实例化了比以前更多的消费者。

卡夫卡简而言之

消费者也可以针对给定的主题组织成消费者组——组中的每个消费者都从一个唯一的分区中读取数据,并且整个组消费来自整个主题的所有消息。如果您的消费者多于分区,那么一些消费者将处于空闲状态,因为他们没有可读取的分区。如果您的分区比消费者多,那么消费者将收到来自多个分区的消息。如果您有相同数量的消费者和分区,则每个消费者都会从一个分区中按顺序读取消息。

他们不会完全产生错误,所以这将是 Sarama 的问题。

于 2019-04-26T13:08:49.910 回答