7

使用 spring-cloud-stream 的 kafka binder,如何配置并发消息消费者(在单个消费者 jvm 中)?如果我理解正确,使用 kafka 时并发消息消耗需要分区,但scs 文档表明要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。Kafka 文档提到了循环分区。

scs 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency ,尽管这在我上面描述的用例中似乎很重要。使用生产者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save:
          destination: customer-save
          group: customer-save
          content-type: application/json
          partitionCount: 3

和消费者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save: 
          destination: customer-save
          group: customer-save
          content-type: application/x-java-object;type=foo.Customer
          partitioned: true
          concurrency: 3

我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管除了轮询之外似乎确实存在一些分区,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。

当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,生产者是否使用了一些默认的密钥提取和分区策略?这是使用 kafka 设置 scs 消费者的合适方法,您希望多个线程消费消息以增加消费者吞吐量?

4

1 回答 1

4

由于您的生产者没有分区(没有partitionKeyExpression设置),生产者端将在 3 个分区上循环(如果这不是观察到的行为,请在 Git Hub 中打开一张票)。如果您配置了 a ,partitionKeyExpression那么生产者将根据配置的逻辑有效地对数据进行分区。

在消费者方面,我们确保线程/分区的亲和性,因为这是一个广受推崇的 Kafka 约定——我们确保给定分区上的消息按顺序处理——这可能解释了你观察到的行为。如果将消息 A、B、C、D 发送到分区 0、1、2、0 - D 将不得不等到 A 被处理,即使有两个其他线程可用。

增加吞吐量的一种选择是过度分区(这是 Kafka 中相当典型的策略)。这将进一步分散消息,并增加将消息发送到不同线程的机会。

如果您不关心排序,则增加吞吐量的另一个选择是在下游异步处理消息:例如,通过将输入通道桥接到 ExecutorChannel。

一般而言,partitioned是指客户端接收分区数据的能力(Kafka 客户端始终是分区的,但此设置也适用于 Rabbit 和/或 Redis)。它与属性结合使用instanceIndexinstanceCount确保主题的分区在多个应用程序实例之间正确划分(另请参阅http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4 /reference/htmlsingle/index.html#_instance_index_and_instance_count )

于 2016-03-07T22:36:35.383 回答