2

是否可以在不使用of和定义@Configuration类的情况下将类型信息传递给 Kafka 消费者的反序列化器?@BeanconsumerFactory()containerFactory

在该@Bean方法中,我必须将 yml 文件中已有的所有配置再次放入地图并将其传递给工厂的构造函数,但我认为这是一种开销。我想找到一种方法将所有配置保存在 yaml/properties 中。(而且我认为将配置放入 yaml 而不是代码中更简洁)

我想为每个带有注释的消费者/每个方法指定类型,@KafkaListener因为我将收到不同的 JSON,映射到其相应的 DTO,所以通用spring.json.value.default.type在这里不适用。(而且我认为它很难看)

我正在使用我的代码进行测试(Spring Kafka Test with an EmbeddedKafka),但现在它抱怨找不到类型信息:

o.s.k.listener.LoggingErrorHandler - Error while processing: null org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition checkout_order_paid-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:326)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.lang.Thread.run(Thread.java:748)

我知道文档,但我不知道如何正确执行:

https://docs.spring.io/spring-kafka/reference/html/#spring-messaging-message-conversion

4

0 回答 0