0

我正在使用带有模式注册表的 spring-cloud-stream kafka binder。(不是 kakfa 流)我要做的是当不可反序列化的消息进入输入主题时,将不可反序列化的消息发送到 dlq。

所以我尝试了下面的配置,但是 spring cloud stream 应用程序不断重试并说

引起:org.apache.kafka.common.errors.SerializationException:反序列化 id -1 的 Avro 消息时出错

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
  kafka:
    binder:
      brokers: localhost:9092
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: input-topic-dlq
          autoCommitOnError: true
          autoCommitOffset: true
    default:
      consumer:
        configuration:
          schema.registry.url: http://localhost:8081
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

我究竟做错了什么?请帮忙。

4

1 回答 1

1

请不要在多个地方问同一个问题,这是浪费您和我们的时间;正如我已经在 Gitter 上回答的那样:

此错误发生在堆栈中太远,以至于 spring-cloud-stream 无法帮助解决它。您需要使用 aListenerContainerCustomizer @Bean来配置 aSeekToCurrentErrorHandlerDeadLetterPublishingRecoverer配置 a ErrorHandlingDeserializer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

也就是说,Stack Overflow 是解决此类问题的更好媒介。

于 2021-02-18T15:36:58.020 回答