2

我有一个 Kafka 应用程序,我一直在使用 kafka-console-consumer.sh 来消费消息,如下所示:

$./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic

它提供了我通过 Kafka 消费者写入 Kafka 代理的所有消息,没有任何遗漏。

最近,我将应用程序部署在无法访问 zookeeperhost 的不同环境中(由于某种原因)。所以我使用 kafka-simple-consumer-shell.sh 代替如下:

$./kafka-simple-consumer-shell.sh --broker-list brokerhost:9092 --topic myTopic --partition 0 --max-messages 1

但是有了这个,我看到很少有消息(大约 5000 条中的 2-4 条)丢失。有人可以解释一下 kafka-simple-consumer-shell.sh 如何读取消息。

我怀疑有些消息可能会发送到某个不同的分区,因为我只是从分区 0 读取,所以我不会每次都收到所有消息。但是我不知道如何检查有多少个分区?其他分区的 id 是什么?我试过 1 但它不起作用。

有人可以帮忙吗。

4

1 回答 1

3

kafka-simple-consumer.sh只需创建一个从一个分区读取消息的消费者。partition 0 of myTopic因此,您的命令只是从中读取一条消息brokerhost:9092。如果分区 1 不在同一个代理中,它将无法像您所做的那样工作。(有关更多信息,请查看来自 GitHub 的代码

如果您可以访问 Zookeeper 主机,您可以简单地检查分区在集群中的分布情况

bin/kafka-topics.sh --describe --zookeeper zookeeperhost:2181 --topic myTopic

但是如果你无法访问Zookeeper主机,我能想到的有两种方法。

  1. 提供一个将所有代理作为参数的列表,并尝试从 0 到 N 的分区号​​。您可以提供多个代理,--broker-list格式为broker1:port2,broker2:port2,broker3:port3. 然后就可以算出整个集群有多少个partition,但是还是不知道哪个broker有哪个partition。
  2. 手动检查每个代理的日志目录。检查/tmp/kafka-logs(如果您使用的是默认日志目录)。您会发现像myTopic-0, myTopic-1, ... 这样的目录格式为topic-partition#. 您可以使用它手动检查哪个代理具有哪些分区。
于 2015-03-24T05:00:28.843 回答