1

我正在开发一个有 2 项服务的项目:读取、转换消息,然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml

    spring:
  kafka:
    bootstrap-servers: localhost:9092
    sourcetopic1: topic1
    destinationtopic1 : topic2
    sourcetopic2: topic3
    destinationtopic2 : topic4
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: TestCollector
      client-id:TestCollector01
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

这些是我的两个服务的配置文件:

  1. Service1KafkaConfig

     public class KafkaConfig {
         @Bean
         public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) {
             ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
             return basicReceiverOptions.subscription(Collections.singletonList(topic));
         }  
    
    
     @Bean
         public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
             return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
         }  
    

    }

  2. 服务2配置

     public class Service2KafkaConfig {
         @Bean
         public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) {
             ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
             return basicReceiverOptions.subscription(Collections.singletonList(topic));
         }
    
     @Bean
     public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
         return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
     }
    

    }

我在各自的服务中自动装配这些 bean:

Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要它。如果需要,请告诉我。

  @Slf4j
@Service
public class Service1 implements CommandLineRunner {
    @Autowired
    public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;


    public Flux<String> consume1() {
        return service1KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
.donOnNext(s->ProcessRecord(s))
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }

    @Override
    public void run(String... args) throws Exception {
        consume1().subscribe();
    }
}

服务2:

@Slf4j
@Service
public class Service2 implements CommandLineRunner {
    @Autowired
    @Qualifier("service2KafkaConsumerTemplate")
    public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;


    public Flux<String> consume2() {
        return service2KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }


    @Override
    public void run(String... args) throws Exception {
        consume2().subscribe();
    }
}

当我运行应用程序时,我只能看到一个开始订阅 topic1 的消费者。是否可以在同一个项目中运行多个 Kafka 消费者。如果是的话,你能告诉我需要做什么才能让它们运行吗?

4

1 回答 1

1

看起来他们都使用相同的主题和配置;如果只有一个分区并且它们在同一个消费者组中,则只有其中一个会获取任何数据。

如果你想让它们都得到相同的数据,你必须把它们放在不同的消费者组中。

这按预期工作:

@Bean
public ApplicationRunner runner1() {
    return args -> {
        ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                        ConsumerConfig.GROUP_ID_CONFIG, "group1",
                        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                .withKeyDeserializer(new StringDeserializer())
                .withValueDeserializer(new StringDeserializer())
                .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                .commitBatchSize(1)
                .subscription(Collections.singletonList("INPUT_1"));
        KafkaReceiver.create(ro)
                .receive()
                .doOnNext(record -> {
                    System.out.println("one: " + record.value() + "@" + record.offset());
                    record.receiverOffset().acknowledge();
                })
                .doOnError(System.out::println)
                .subscribe();
    };
}

@Bean
public ApplicationRunner runner2() {
    return args -> {
        ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                        ConsumerConfig.GROUP_ID_CONFIG, "group2",
                        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                .withKeyDeserializer(new StringDeserializer())
                .withValueDeserializer(new StringDeserializer())
                .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                .commitBatchSize(1)
                .subscription(Collections.singletonList("INPUT_1"));
        KafkaReceiver.create(ro)
                .receive()
                .doOnNext(record -> {
                    System.out.println("two: " + record.value() + "@" + record.offset());
                    record.receiverOffset().acknowledge();
                })
                .doOnError(System.out::println)
                .subscribe();
    };
}
one: foo@16
two: foo@16
于 2021-09-10T18:31:18.437 回答