2

我在 Spring Boot 应用程序中有 Kafka 消费者。我将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 保持为 false,并且我的消费者正在手动确认消息。Spring-Kafka:2.2.11.RELEASE

我的配置:

 @Override
  public Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryServers);

    return props;
  }

连接工厂

 ConcurrentKafkaListenerContainerFactory<K, V> kvConcurrentKafkaListenerContainerFactory =
        new ConcurrentKafkaListenerContainerFactory<>();
    kvConcurrentKafkaListenerContainerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props, getAvroKeyDeserializer(),
                                             getAvroValueDeserializer());
    kvConcurrentKafkaListenerContainerFactory.getContainerProperties()
                                             .setAckOnError(false);
    kvConcurrentKafkaListenerContainerFactory.getContainerProperties()
                                             .setAckMode(
                                                 ContainerProperties.AckMode.MANUAL_IMMEDIATE);

卡夫卡消费者:

@KafkaListener(topics = "${topic-name}", groupId = "${group-id}", containerFactory = CONTAINER_FACTORY)
  public void consume(ConsumerRecord<Key, Envelope> record, Acknowledgment acknowledgment) {
    final Envelope envelope = record.value();
    if(//some condition){
         //logic
    }
    acknowledgment.acknowledge();
  }

如果应用程序在 If 语句处崩溃,则问题会丢失。

我的理解是'acknowledgement.acknowledge();' 没有完成并且应用程序崩溃然后在重新启动时应该再次处理相同的消息。

我需要帮助来了解我在这里做错了什么。

4

0 回答 0