0

即使在特定配置之后,Spring Cloud Kafka 流也不会在反序列化错误时重试。期望是,它应该根据配置的重试策略重试,最后将失败的消息推送到 DLQ。

配置如下。

spring.cloud.stream.bindings.input_topic.consumer.maxAttempts=7
spring.cloud.stream.bindings.input_topic.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input_topic.consumer.backOffMultiplier=10.0
spring.cloud.stream.bindings.input_topic.consumer.backOffMaxInterval=100000
spring.cloud.stream.bindings.iinput_topic.consumer.defaultRetryable=true
public interface MyStreams {

    String INPUT_TOPIC = "input_topic";
    String INPUT_TOPIC2 = "input_topic2";
    String ERROR = "apperror";
    String OUTPUT = "output";

    @Input(INPUT_TOPIC)
    KStream<String, InObject> inboundTopic();

    @Input(INPUT_TOPIC2)
    KStream<Object, InObject> inboundTOPIC2();

    @Output(OUTPUT)
    KStream<Object, outObject> outbound();

    @Output(ERROR)
    MessageChannel outboundError();
}

@StreamListener(MyStreams.INPUT_TOPIC)
    @SendTo(MyStreams.OUTPUT)
    public KStream<Key, outObject> processSwft(KStream<Key, InObject> myStream) {
        return myStream.mapValues(this::transform);
    }

KafkaTopicProvisioner.java 中的 metadataRetryOperations 始终为空,因此它在afterPropertiesSet().

public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
        Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0D);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }

    }
4

2 回答 2

0

即使在特定配置之后,Spring Cloud Kafka 流也不会在反序列化错误时重试。

当遇到反序列化错误时,您看到的行为与 Kafka Streams 的默认设置相匹配。

来自https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records

LogAndFailExceptionHandler实现DeserializationExceptionHandler并且是 Kafka Streams 中的默认设置。它通过记录错误并抛出致命错误来停止您的 Streams 应用程序来处理任何遇到的反序列化异常。如果您的应用程序被配置为使用LogAndFailExceptionHandler,那么当您的应用程序实例遇到损坏的记录时,它会通过终止自身来快速失败。

我不熟悉 Spring 的 Kafka Streams 外观,但您可能需要配置所需的org.apache.kafka.streams.errors.DeserializationExceptionHandler,而不是配置重试(它们用于不同的目的)。或者,您可能想要实现自己的自定义处理程序(有关更多信息,请参见上面的链接),然后配置 Spring/KStreams 以使用它。

于 2019-06-26T06:29:37.977 回答
0

重试配置仅适用于MessageChannel基于 - 的活页夹。使用 KStream binder,Spring 仅帮助以规定的方式构建拓扑,一旦构建了拓扑,它就不会参与消息流。

spring-kafka(binder 使用的)的下一个版本添加了RecoveringDeserializationExceptionHandler( commit here ); 虽然它不能帮助重试,但它可以与DeadLetterPublishingRecoverer将记录发送到死信主题一起使用。

您可以RetryTemplate在处理器/变压器中使用 a 来重试特定操作。

于 2019-06-26T13:07:14.790 回答