0

Spring Boot 环境监听 kafka 主题(@KafkaListener / @StreamListener) 配置监听工厂以批处理方式运行:

ConcurrentKafkaListenerContainerFactory # setBatchListener

或通过application.properties

spring.kafka.listener.type=batch

如何配置框架,以便给定两个数字:N 和 T,它将尝试为侦听器获取 N 条记录,但不会等待超过 T 秒,如下所述:https ://doc.akka.io/docs /akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:

  • max-poll-records 确保您不会在批次中获得超过 N 个数字
  • fetch-min-size在 fetch 请求中至少获取此数量的数据
  • fetch-max-wait但不要等待超过必要的时间
  • idleBetweenPolls在民意调查之间睡一会儿:)

似乎fetch-min-size结合fetch-max-wait应该这样做,但它们比较字节,而不是消息/记录。

显然可以手动实现,我正在寻找是否可以为我配置 Spring。

4

1 回答 1

2

似乎fetch-min-size结合fetch-max-wait应该这样做,但它们比较字节,而不是消息/记录。

这是正确的,不幸的是,Kafka 没有提供诸如fetch.min.records.

我预计 Spring 不会将此功能置于 kafka-clients 之上。最好在 Kafka 本身中要求一个新功能。

Spring 根本不操作从轮询返回的记录,除了您现在可以指定subBatchPerPartition获取仅包含一个分区的批次,以便在使用仅一次读取/处理/写入时正确支持僵尸防护。

于 2019-11-06T13:50:41.683 回答