我在 spring-boot 中有一个工作人员,它用 20 个分区收听 kafka 主题。我创建了以下侦听器:
@KafkaListener(topics = "mytopic")
public void listen(@Payload(required = true) IncomingMessage msg,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
在这个监听器中,我打印了分区,发现我所有的消息都来自同一个分区号 - 17。这意味着生产者端正在将所有消息写入同一个分区。
我的容器工厂是ConcurrentKafkaListenerContainerFactory,所以我希望能够同时处理多个事件,这意味着我需要不同的分区来在其中也有事件..
Linux机器中的生产者端是kafkacat,它在这个主题中产生事件。似乎 kafkacat 无法以循环方式发送到分区。
问题是,我必须使用一些 CLI 工具来生成事件而不是服务。有没有办法使用 CLI 工具来解决这个问题?我找不到任何执行此操作的 cli 工具。
我想过在文件中维护一个分区号,并读取它,然后在 kafkacat 命令中提供分区号,然后增加 (partition-number)%partitions,但这不是线程安全的。
注意:主题没有Key,我只产生Value消息,不关心Key。