14

嗨,我目前正在涉足 Spring Kafka,并成功地将单个 KafkaListenerContainerFactory 添加到我的侦听器中。现在我想添加多个 KafkaListenerContainerFactory(一个用于将在 json 中包含消息的主题,另一个用于字符串)。请参见下面的代码:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

运行它会给我以下错误:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

我究竟做错了什么?

4

3 回答 3

24

看起来您不会依赖 Spring Boot 的Kafka Auto Configuration

Spring Boot 中提供KafkaAutoConfiguration

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {

既然你有jsonConsumerFactoryand fileConsumerFactory,它们会覆盖自动配置提供的那个。

但另一方面,在KafkaAnnotationDrivenConfiguration, 非你的工厂可以应用:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

因为你的ConsumerFactory豆子不是ConsumerFactory<Object, Object>类型的。

所以:

  • KafkaAutoConfiguration只需将以下内容添加到应用程序属性文件中,即可从 Spring Boot 自动配置中 排除:spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
  • 或将其中一个KafkaListenerContainerFactorybean重命名为kafkaListenerContainerFactory以在 Boot 中覆盖它
  • 或将其中一个ConsumerFactory豆子作为一种ConsumerFactory<Object, Object>类型。
于 2017-03-31T13:58:19.760 回答
6

我已经在下面的代码中实现了它,它对我来说工作正常。

// LISTENER 1
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory1")
public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
     new JsonDeserializer<>(YourCustomObject1.class));
}


@Bean(name = "yourListenerFactory1")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> 
  yourListenerFactory1() {
   ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory =
       new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(yourConsumerFactory1());
   ContainerProperties containerProperties = factory.getContainerProperties();
   containerProperties.setPollTimeout(...);
   containerProperties.setAckMode(AckMode...);
   return factory;
}


// LISTENER 2
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory2")
public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      new JsonDeserializer<>(YourCustomObject2.class));
}


@Bean(name = "yourListenerFactory2")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> 
   yourListenerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory 
         =  new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(yourConsumerFactory2());
    ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setPollTimeout(...);
    containerProperties.setAckMode(AckMode...);
    return factory;
 }

另外,我已将 spring.autoconfigure.exclude 属性设置为 ITS MUST spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

这是我的消费者配置

消费者 1

@KafkaListener(id = "your-cousumer-1",
  topicPattern = "your-topic-1",
  containerFactory = "yourListenerFactory1")
 public void consumer1(YourCustomObject1 data,
                       Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }

消费者 2

  @KafkaListener(id = "your-cousumer-2",
                 topicPattern = "your-topic-2",
                 containerFactory = "yourListenerFactory2")
  public void consumer2(YourCustomObject2 data,
                        Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ...  }

另外,我的卡夫卡模板是

@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
于 2018-09-21T10:41:05.467 回答
1

您可以在 KafkaListener 定义中定义每个容器工厂,如下所示:

@KafkaListener(topics = "fileTopic", containerFactory = "kafkaFileListenerContainerFactory")
public void fileConsumer(...) {...}

@KafkaListener(topics = "jsonTopic", containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonConsumer(...) {...}
于 2018-04-10T08:52:13.470 回答