1

版本:GoLang 1.10.2 Kafka 4.4.1 Docker 18.03.1

我正在尝试使用 Shopify 的 Sarama 包来测试我的 Kafka 实例。我使用 Docker compose 来支持 Kafka/Zookeeper,它都可以成功运行。

当我尝试使用 Sarama 创建 Producer 客户端时,会引发错误。

当我运行以下

    package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"strconv"

"github.com/Shopify/sarama"

)

func main() {


// Setup configuration
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:29092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
    // Should not reach here
    panic(err)
}

defer func() {
    if err := producer.Close(); err != nil {
        // Should not reach here
        panic(err)
    }
}()

我明白了

[sarama] 2018/06/12 17:22:05 初始化新客户端

[sarama] 2018/06/12 17:22:05 客户端/元数据从代理 localhost:29092 获取所有主题的元数据

[sarama] 2018/06/12 17:22:05 在 localhost:29092 连接到代理(未注册)

[sarama] 2018/06/12 17:22:05 客户端/元数据在获取元数据时从代理收到错误:EOF

[sarama] 2018/06/12 17:22:05 关闭与代理 localhost:29092 的连接

{sarama] 2018/06/12 17:22:05 客户端/元数据没有可用的代理发送元数据请求

[sarama] 2018/06/12 17:22:06 Closing Client panic: kafka: client has run out of available brokers to talk(你的集群是否可达?)

goroutine 1 [运行]:main.main() /Users/benwornom/go/src/github.com/acstech/doppler-events/testprod/main.go:29 +0x3ec 退出状态 2

Sarama 确实连续多次尝试创建生产者客户端,但每次都失败了。

我对 Sarama 的“NewAsyncProducer”方法的理解是,它调用了“NewClient”,不管你是创建Producer 还是Consumer,都会调用它。NewClient 尝试从 Kafka 代理收集元数据,这在我的情况下失败了。我知道它正在连接到 Kafka 代理,但是一旦连接,它似乎就中断了。任何意见将是有益的。我的网络连接很强,我想不出任何干扰服务器的东西。据我所知,现有主题只有一个代理和一个分区。我认为我不必手动将主题分配给经纪人。如果我的客户正在与代理连接,为什么我不能为我的生产者建立持久连接?

这是来自 kafka 日志文件,就在它死之前。

__consumer_offsets-5 -> Vector(1), connect-offsets-23 -> Vector(1), __consumer_offsets-43 -> Vector(1), __consumer_offsets-32 -> Vector(1), __consumer_offsets-21 -> Vector(1) ), __consumer_offsets-10 -> Vector(1), connect-offsets-20 -> Vector(1), __consumer_offsets-37 -> Vector(1), connect-offsets-9 -> Vector(1), connect-status- 4 -> Vector(1),__consumer_offsets-48 -> Vector(1),__consumer_offsets-40 -> Vector(1),__consumer_offsets-29 -> Vector(1),__consumer_offsets-18 -> Vector(1),连接- offsets-14 -> Vector(1), __consumer_offsets-7 -> Vector(1), __consumer_offsets-34 -> Vector(1), __consumer_offsets-45 -> Vector(1), __consumer_offsets-23 -> Vector(1), connect-offsets-6 -> Vector(1),connect-status-1 -> Vector(1),connect-offsets-17 -> Vector(1),connect-offsets-0 -> Vector(1),connect-offsets-22 -> Vector(1), __consumer_offsets-26 -> Vector(1), connect-offsets-11 -> Vector(1), __consumer_offsets-15 -> Vector(1), __consumer_offsets-4 -> 向量(1), __consumer_offsets-42 -> Vector(1), __consumer_offsets-9 -> Vector(1), __consumer_offsets-31 -> Vector(1), __consumer_offsets-20 -> Vector(1), connect-offsets-3 - > Vector(1), __consumer_offsets-1 -> Vector(1), __consumer_offsets-12 -> Vector(1), connect-offsets-8 -> Vector(1), connect-offsets-19 -> Vector(1), connect-status-3 -> Vector(1), __confluent.support.metrics-0 -> Vector(1), __consumer_offsets-17 -> Vector(1), __consumer_offsets-28 -> Vector(1), __consumer_offsets-6 - > Vector(1), __consumer_offsets-39 -> Vector(1), __consumer_offsets-44 -> Vector(1), connect-offsets-16 -> Vector(1), connect-status-0 ->Vector(1), connect-offsets-5 -> Vector(1), connect-offsets-21 -> Vector(1), __consumer_offsets-47 -> Vector(1), __consumer_offsets-36 -> Vector(1), __consumer_offsets -14 -> Vector(1), __consumer_offsets-25 -> Vector(1), __consumer_offsets-3 -> Vector(1), __consumer_offsets-30 -> Vector(1), __consumer_offsets-41 -> Vector(1), 连接-offsets-13 -> Vector(1),connect-offsets-24 -> Vector(1),connect-offsets-2 -> Vector(1),connect-configs-0 -> Vector(1),__consumer_offsets-11 -> Vector(1), __consumer_offsets-22 -> Vector(1), __consumer_offsets-33 -> Vector(1), __consumer_offsets-0 -> Vector(1), connect-offsets-7 -> Vector(1), 连接-offsets-18 -> Vector(1))) (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,461]调试[控制器ID = 1]主题不在代理1 Map()的首选副本中(kafka.controller.KafkaController)[36mkafka_1 | [0m [2018-06-12 20:24:47,462]跟踪[控制器ID = 1 ] broker 1 的领导者不平衡比率为 0.0 (kafka.controller.KafkaController)

4

1 回答 1

0

我遇到了同样的问题,我做了以下解决我的问题:

  1. 检查您使用的 Kafka 版本。传递给时在配置中指定的默认 Kafka 版本NewAsyncProducerV0_8_2_0. 确保在配置中指定正确的 Kafka 版本:

    config := sarama.NewConfig()
    config.Version = sarama.V1_1_0_0
    
  2. 确保您传递的代理 URL 正确。您应该传递代理 URL,而不是 zookeeper URL 或其他 URL。默认的 kafka 端口是这样的,如果您使用默认端口9092,URL 应该类似于。BROKER_URL:9092

于 2018-11-25T20:52:23.997 回答