0

我在 spring-boot 中有一个工作人员,它用 20 个分区收听 kafka 主题。我创建了以下侦听器:

@KafkaListener(topics = "mytopic")
public void listen(@Payload(required = true) IncomingMessage msg, 
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) long offset) {

在这个监听器中,我打印了分区,发现我所有的消息都来自同一个分区号 - 17。这意味着生产者端正在将所有消息写入同一个分区。

我的容器工厂是ConcurrentKafkaListenerContainerFactory,所以我希望能够同时处理多个事件,这意味着我需要不同的分区来在其中也有事件..

Linux机器中的生产者端是kafkacat,它在这个主题中产生事件。似乎 kafkacat 无法以循环方式发送到分区。

问题是,我必须使用一些 CLI 工具来生成事件而不是服务。有没有办法使用 CLI 工具来解决这个问题?我找不到任何执行此操作的 cli 工具。

我想过在文件中维护一个分区号,并读取它,然后在 kafkacat 命令中提供分区号,然后增加 (partition-number)%partitions,但这不是线程安全的。

注意:主题没有Key,我只产生Value消息,不关心Key。

4

2 回答 2

1

正如我之前评论的,在github 上找到了这个解决方案- 只需将此参数添加到 kafkacat:

-X topic.partitioner=murmur2_random
于 2020-03-05T08:02:43.187 回答
0

正如所评论的, kafkacat 应该遵循与任何其他 Kafka 客户端相同的 murmur2 / 循环策略。

当然,非常欢迎您编写自己的 Spring Boot CLI 工具来执行您自己的逻辑

于 2020-03-04T15:14:51.513 回答