我在 docker 上安装了 confluent kafka。在主题中,我有 10 个分区。问题是我无法使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用 C# confluent.kafka driver 1.5.1 (latest) 和 librd.kafka 1.5.0 (latest) 从主题中消费。
我启动kafka的docker-compose文件如下
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
networks:
- bridge_network
ports:
- "3001:3001"
environment:
ZOOKEEPER_CLIENT_PORT: 3001
ZOOKEEPER_TICK_TIME: 3000
broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "3002:3002"
networks:
- bridge_network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka_manager:
image: sheepkiller/kafka-manager
hostname: kafka_manager
depends_on:
- zookeeper
ports:
- '9000:9000'
networks:
- bridge_network
environment:
ZK_HOSTS: 'zookeeper:3001'
networks:
bridge_network:
driver: bridge
driver_opts:
com.docker.network.enable_ipv6: "false"
我在 C# 中的使用者配置如下:
var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
{
{ "bootstrap.servers", "PLAINTEXT://localhost:3002" },
{ "group.id", "some-test-group" },
{ "auto.offset.reset", "latest"},
{ "compression.codec", "gzip" },
{ "enable.auto.commit", "false" }
}).Build();
consumer.Subscribe("some-test-topic");
while (true)
{
var cr = consumer.Consume(30_000);
if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
{
System.Console.WriteLine("that's it");
break;
}
System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
}
我确定主题分区中有消息,因为我可以使用 kafka 工具 2.0 检查主题
我很确定我错过了配置文件中的某些内容,但是在阅读了 2 天的文档并将我的头撞到墙上后,我仍然找不到问题。那么有人可以帮忙吗?