0

我已经构建了一个生产者 spring 云流应用程序和 kafka 作为 binder。这是application.yml:

spring:
cloud:
  stream:
    instanceCount : 1
    bindings:
      output:
        destination: topic-sink
        producer:
           partitionSelectorClass: com.partition.CustomPartition
           partitionCount: 1        
...

我有两个实例(在单个 jvm 上运行的同一个应用程序)作为消费者。这是application.yml:

spring:  
cloud:
  stream:
    bindings:
      input:
        destination: topic-sink
        group: hdfs-sink
        consumer:
          partitioned: true
...

我对 kafka 组的理解是,对于同一组中的消费者,消息只会被消费一次。假设,如果生产者应用程序产生消息 A,B 并且同一组中有两个消费者应用程序,消息 A 将由消费者 1 读取,消息 B、C 将由消费者 2 读取。但是,我的消费者正在消费相同消息。我的假设是错误的吗?

4

1 回答 1

0

我得到了解决方案,谢谢 Arek。对于 1 个分区和 1 个消费者。我在 Spring Cloud Stream 应用程序中分享生产者\消费者的解决方案。制片人:

spring: cloud: stream: instanceCount : 1 bindings: output: destination: topic-sink producer: partitionSelectorClass: com.partition.CustomPartition partitionCount: 1
消费者:

spring: cloud: stream: instanceIndex: 0 #between 0 and instanceCount - 1 instanceCount: 1 bindings: input: destination: topic-sink group: hdfs-sink consumer: partitioned: true
kafka: binder: autoAddPartitions: true

于 2016-09-30T16:26:49.720 回答