0

我有两个微服务,一个生产者和一个消费者。生产者每两秒向 kafka 主题写入一个数字增量。消费者有两个正在运行的实例,消耗这些增量。我注意到一些我想解决的奇怪问题:

  • 当还没有消费者,生产者开始生产时,消息被保存在 kafka 中。当消费者上线时,它不会处理生产者已经产生的已经存在的消息,而是开始消费现在进来的消息。消费者如何处理所有未消费的消息?
  • 当有两个消费者时,我希望两个消费者平等消费。现在只有一个消费者得到了所有的负载,而另一个只是坐在那里。如何将负载分散到消费者数量上?
  • 看起来 kafka 保存了所有正在生成的记录,即使它已经被消费者消费了。有什么办法可以防止这种情况发生吗?我找不到关于例如致谢的好信息。

有人知道这三个问题之一的答案吗?

消费者配置:

mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stocks.group.id=test1
mp.messaging.incoming.stocks.auto.offset.reset=earliest

生产者配置:

mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock- 
quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
4

1 回答 1

2

Kafka 默认保存所有记录,因为它被归类为事件流引擎(实时消息传递 + 存储)。主题中消息保留的默认配置是 7 天(168 小时),并且可以使用主题配置retention.ms=10000(10 秒或您想要的任何值)进行更改。

对于并发消费,您需要确保您的主题已分区,因为主题分区是 Kafka 中的并行单元。我还记得 Quarkus/Smallrye 开发团队正在实施Kafka rebalance listener。如果这个与最新的 Quarkus 放在一起,那么剩下的部分就是划分你的主题。

为了在将分区分配给消费者之前使用消息,我们需要 Dev。团队输入在这里,我不能给这个建议。

于 2020-07-12T20:13:29.903 回答