1

我正在使用 Docker 启动一个 kafka 代理集群(例如,5 个代理,每个容器一个代理)。Kafka 版本 2.12-0.11.0.0,Zookeeper 3.4.10。

场景:

  • 使用以下配置启动第一个代理

动物园.cfg

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888

服务器属性

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

生产者属性

bootstrap.servers=localhost:9092
compression.type=none

消费者属性

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • Zookeeper 以独立模式启动,然后启动 kafka

  • 创建主题

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-test-topic1

  • 发送消息

echo "test_kafka1" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-test-topic1

  • 检查消息

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

收到消息

  • 描述主题

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

  • 开始休息 4 个经纪人

zoo.cfg从第 1 到第 5 的每个经纪人(只有 0.0.0.0:2888:3888 位置不同)

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=1000000
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888

从第 1 个到第 5 个代理上的server.properties(broker.id 是唯一的,broker_IP:broker_PORT 对于 ech 代理不同)

broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.initial.rebalance.delay.ms=0

从第 1 到第 5 的每个经纪人的producer.properties

bootstrap.servers=localhost:9092
compression.type=none

从第 1 到第 5 的每个经纪人的consumer.properties

zookeeper.connect=127.0.0.1:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms=1000000
group.id=test-consumer-group
  • 重启每个broker上的zookeeper使zoo.cfg生效

  • 动物园管理员聚集成集群

  • 话题转移到经纪人 5

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 5 Replicas: 5 Isr: 5

这是正常行为吗?还是应该留在经纪人 1 上?

  • 检查每个代理上的消息

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-test-topic1 --max-messages 1

消息丢失(topic停留在broker 1时消息不丢失,为浮动情况)

4

2 回答 2

0

您是否尝试将滴答时间提高到 6000?根据 Hadoop 的设置,他们默认使用此设置,说明 2000 毫秒设置太低。我认为这同样适用于这里。我现在正在处理一个非常相似的 kafka 问题。

于 2018-05-01T16:24:17.060 回答
0

在 Kafka 文档中,配置描述配置示例都建议在 broker 中指定所有 zookeeper 服务器zookeeper.connect。同样在生产中,您希望您运行一个单独的 Zookeeper 集群和一个单独的 Kafka 集群,而不是在一个 docker 容器中共同运行 Kafka 和 ZK。

我想可能会发生这样的事情:

  • 由于您如何重新启动 docker 容器的一些细节,ZKs 2-5 不知道 Kafka 1 在 ZK 1 中创建了一个描述您的测试主题的 znode 具有“副本:1,ISR:1”,或者不同意使用 ZK 1 版本,因为没有法定人数
  • 容器的一些子集 2-5 启动并且 5 个 ZK 中的 3 个在那里形成一个仲裁而不等待 ZK 1
  • 某些东西(消费者或命令行工具或代理自动创建)尝试使用该主题,并且由于 ZK quorum 同意它尚不存在,因此创建它并将副本分配给当前可用的代理之一(在这种情况下为 5 个) .
  • 容器 1 启动,ZK 1 必须放弃其主题 znode 的版本以支持 quorum,Kafka 必须放弃其副本以支持当前描述。

我不确定从单节点 Zookeeper 移动到复制设置的正确方法是什么,并且在文档中找不到它。也许您必须最初weight为您的第一个 ZK 分配更多,这样您才能保证它成为领导者并在其他 ZK 节点上强制其主题配置。

您是否创建了 JIRA 问题?有开发者的回应吗?

于 2019-03-22T11:09:37.683 回答