版本:GoLang 1.10.2 Kafka 4.4.1 Docker 18.03.1
我正在尝试使用 Shopify 的 Sarama 包来测试我的 Kafka 实例。我使用 Docker compose 来支持 Kafka/Zookeeper,它都可以成功运行。
当我尝试使用 Sarama 创建 Producer 客户端时,会引发错误。
当我运行以下
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
"strconv"
"github.com/Shopify/sarama"
)
func main() {
// Setup configuration
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.WaitForAll
brokers := []string{"localhost:29092"}
producer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
// Should not reach here
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
// Should not reach here
panic(err)
}
}()
我明白了
[sarama] 2018/06/12 17:22:05 初始化新客户端
[sarama] 2018/06/12 17:22:05 客户端/元数据从代理 localhost:29092 获取所有主题的元数据
[sarama] 2018/06/12 17:22:05 在 localhost:29092 连接到代理(未注册)
[sarama] 2018/06/12 17:22:05 客户端/元数据在获取元数据时从代理收到错误:EOF
[sarama] 2018/06/12 17:22:05 关闭与代理 localhost:29092 的连接
{sarama] 2018/06/12 17:22:05 客户端/元数据没有可用的代理发送元数据请求
[sarama] 2018/06/12 17:22:06 Closing Client panic: kafka: client has run out of available brokers to talk(你的集群是否可达?)
goroutine 1 [运行]:main.main() /Users/benwornom/go/src/github.com/acstech/doppler-events/testprod/main.go:29 +0x3ec 退出状态 2
Sarama 确实连续多次尝试创建生产者客户端,但每次都失败了。
我对 Sarama 的“NewAsyncProducer”方法的理解是,它调用了“NewClient”,不管你是创建Producer 还是Consumer,都会调用它。NewClient 尝试从 Kafka 代理收集元数据,这在我的情况下失败了。我知道它正在连接到 Kafka 代理,但是一旦连接,它似乎就中断了。任何意见将是有益的。我的网络连接很强,我想不出任何干扰服务器的东西。据我所知,现有主题只有一个代理和一个分区。我认为我不必手动将主题分配给经纪人。如果我的客户正在与代理连接,为什么我不能为我的生产者建立持久连接?
这是来自 kafka 日志文件,就在它死之前。
__consumer_offsets-5 -> Vector(1), connect-offsets-23 -> Vector(1), __consumer_offsets-43 -> Vector(1), __consumer_offsets-32 -> Vector(1), __consumer_offsets-21 -> Vector(1) ), __consumer_offsets-10 -> Vector(1), connect-offsets-20 -> Vector(1), __consumer_offsets-37 -> Vector(1), connect-offsets-9 -> Vector(1), connect-status- 4 -> Vector(1),__consumer_offsets-48 -> Vector(1),__consumer_offsets-40 -> Vector(1),__consumer_offsets-29 -> Vector(1),__consumer_offsets-18 -> Vector(1),连接- offsets-14 -> Vector(1), __consumer_offsets-7 -> Vector(1), __consumer_offsets-34 -> Vector(1), __consumer_offsets-45 -> Vector(1), __consumer_offsets-23 -> Vector(1), connect-offsets-6 -> Vector(1),connect-status-1 -> Vector(1),connect-offsets-17 -> Vector(1),connect-offsets-0 -> Vector(1),connect-offsets-22 -> Vector(1), __consumer_offsets-26 -> Vector(1), connect-offsets-11 -> Vector(1), __consumer_offsets-15 -> Vector(1), __consumer_offsets-4 -> 向量(1), __consumer_offsets-42 -> Vector(1), __consumer_offsets-9 -> Vector(1), __consumer_offsets-31 -> Vector(1), __consumer_offsets-20 -> Vector(1), connect-offsets-3 - > Vector(1), __consumer_offsets-1 -> Vector(1), __consumer_offsets-12 -> Vector(1), connect-offsets-8 -> Vector(1), connect-offsets-19 -> Vector(1), connect-status-3 -> Vector(1), __confluent.support.metrics-0 -> Vector(1), __consumer_offsets-17 -> Vector(1), __consumer_offsets-28 -> Vector(1), __consumer_offsets-6 - > Vector(1), __consumer_offsets-39 -> Vector(1), __consumer_offsets-44 -> Vector(1), connect-offsets-16 -> Vector(1), connect-status-0 ->Vector(1), connect-offsets-5 -> Vector(1), connect-offsets-21 -> Vector(1), __consumer_offsets-47 -> Vector(1), __consumer_offsets-36 -> Vector(1), __consumer_offsets -14 -> Vector(1), __consumer_offsets-25 -> Vector(1), __consumer_offsets-3 -> Vector(1), __consumer_offsets-30 -> Vector(1), __consumer_offsets-41 -> Vector(1), 连接-offsets-13 -> Vector(1),connect-offsets-24 -> Vector(1),connect-offsets-2 -> Vector(1),connect-configs-0 -> Vector(1),__consumer_offsets-11 -> Vector(1), __consumer_offsets-22 -> Vector(1), __consumer_offsets-33 -> Vector(1), __consumer_offsets-0 -> Vector(1), connect-offsets-7 -> Vector(1), 连接-offsets-18 -> Vector(1))) (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,461]调试[控制器ID = 1]主题不在代理1 Map()的首选副本中(kafka.controller.KafkaController)[36mkafka_1 | [0m [2018-06-12 20:24:47,462]跟踪[控制器ID = 1 ] broker 1 的领导者不平衡比率为 0.0 (kafka.controller.KafkaController)