使用 spring-cloud-stream 的 kafka binder,如何配置并发消息消费者(在单个消费者 jvm 中)?如果我理解正确,使用 kafka 时并发消息消耗需要分区,但scs 文档表明要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。Kafka 文档提到了循环分区。
scs 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency ,尽管这在我上面描述的用例中似乎很重要。使用生产者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3
和消费者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3
我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管除了轮询之外似乎确实存在一些分区,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。
当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,生产者是否使用了一些默认的密钥提取和分区策略?这是使用 kafka 设置 scs 消费者的合适方法,您希望多个线程消费消息以增加消费者吞吐量?