3

我在本地机器上设置了 Kafka,并启动了 zookeeper 和单个代理服务器。

现在我有一个带有以下描述的主题:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic edu-topic --describe
Topic:edu-topic PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: edu-topic    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: edu-topic    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: edu-topic    Partition: 2    Leader: 0   Replicas: 0 Isr: 0

我有一个生产者在消费者启动之前产生了一些消息,如下所示:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic edu-topic
>book 
>pen 
>pencil
>marker
>

当我使用 --from-beginning 选项启动消费者时,它不会显示生产者产生的所有消息:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic edu-topic --group edu-service --from-beginning

但是,它显示了新添加的消息。

我在这里做错了什么?有什么帮助吗?

4

4 回答 4

7

--from-beginning:如果消费者还没有确定的消费偏移量,则从日志中出现的最早消息而不是最新消息开始。

Kafka 消费者第一次使用 --from-beginning如果你重试我怀疑你做了,它将从它离开的地方开始。您可以使用以下任何选项再次使用该消息

  1. 使用以下重置消费者组偏移量

kafka-streams-application-reset.sh --application-id edu-service --input-topics edu-topic --bootstrap-servers localhost:9092 --zookeeper 127.0.0.1:2181

然后从头开始重试

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic edu-topic --group edu-service --from-beginning

  1. 使用将从起点开始消费的新消费者 ID

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic edu-topic --group new-edu-service --from-beginning

  1. 您也可以使用偏移量来使用分区中的下 N 条消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --offset 0 --partition 0 --topic edu-topic

--offset <String: consume offset> : 要消耗的偏移量 id(一个非负数),或 'earliest' 表示从头开始,或 'latest' 表示从结束(默认值:最新)
--partition <整数:partition> :除非指定了“--offset”,否则从 Consumption 消费的分区从分区的末尾开始。

于 2019-11-19T08:35:39.313 回答
2

国旗

--from-begining

将影响您的 GroupConsumer 第一次启动/创建时的行为,或者存储的(上次提交的消费)偏移量已过期(或者当您尝试重置存储的偏移量时)。

否则 GroupConsumer 将继续在存储的(最后提交的)偏移处。

请考虑从手册中获取更多信息。

于 2019-11-19T07:26:07.067 回答
2

因为您使用的是旧的消费者组。--from-beginning仅适用于其组名尚未记录在 Kafka 集群上的新消费者组。

要从头开始重新消费,您可以:

  • 使用标志启动一个新的消费者组(更改组名)--from-beginning
  • 重置此消费者组的偏移量。我还没试过,但你可以在这里测试
于 2019-11-19T08:09:15.097 回答
0

只需添加 --from-beginning

但要知道,从一开始的消息就不是有序的

如果您为同一主题使用了多个分区。订单仅在分区级别得到保证。(对于同一个分区)

于 2019-12-25T08:19:11.443 回答