0

我已经阅读了很多类似的主题,但他们无法在这里回答我的问题。

尝试运行一些简短的集成测试,我正在使用 docker-compose 3,一个单节点 kafka。在客户端,我使用 Go shopify/sarama 来消费/生产

zookeeper:
  image: confluentinc/cp-zookeeper:5.2.2
  hostname: zookeeper
  container_name: zookeeper
  ports:
    - "2181:2181"
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
kafka:
  image: confluentinc/cp-enterprise-kafka:5.2.2
  hostname: kafka
  container_name: kafka
  depends_on:
    - zookeeper
  ports:
    - "29092:29092"
  expose:
    - 9092
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

我有另一个来自 docker-compose 的容器,它将收听

- "BROKERS_URL=kafka:9092"

消费者工作得很好:

Sarama 消费者启动并运行。{“经纪人”:[“kafka:9092”],“主题”:[“已验证”],“组”:“事件服务”}

但在生产者部分,直接从我的机器上运行:

kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)

producer, err := sarama.NewSyncProducer([]string{"http://localhost:29092"}, nil)
...
msg := &sarama.ProducerMessage{
    Topic: "validated",
    Key:   sarama.StringEncoder(""),
    Value: sarama.ByteEncoder(payload),
}

partition, offset, err := producer.SendMessage(msg)
...

这里没有什么奇怪/奢侈的,但它不起作用,我很困惑。

还: nc -vz localhost 29092

连接到 localhost 端口 29092 [tcp/*] 成功!

4

1 回答 1

0

代替

    KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

你需要

    KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://0.0.0.0:29092

使用我的主机测试连接kafkacat表明这有效:

➜ kafkacat -b localhost:29092 -L
Metadata for all topics (from broker 1: localhost:29092/1):
 1 brokers:
  broker 1 at localhost:29092 (controller)
 0 topics:

这种区别在于侦听器绑定到所有可用的接口 ( 0.0.0.0)。使用您的原始配置,它绑定到环回接口 ( lo) for localhost,因此仅接受此流量而不接受外部流量。

于 2020-03-12T13:43:46.130 回答