我正在尝试使用来自已在 Avro 中序列化的主题的消息。文档对于这是如何工作的非常混乱。https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_inbound_deserialization
我试图阅读的消息是 avro 序列化消息。我在同一个项目中有键和值的模式,并从模式中生成了类 - 键和值。
我的困惑是,有一些独特的应用程序属性和代码组合可以使其工作。现在我似乎弄错了,我一直在尝试使用一堆属性和代码组合,但它们都不起作用。
我不断收到的错误是
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 7, -46, 15]] from topic [dbserver1.inventory.customers]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])"�"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690)
看起来默认的 json 序列化器正在启动并试图反序列化 avro 序列化消息。
我的代码如下所示
@SpringBootApplication
class SpringBootKafkaConsumer {
@Bean
fun process(): Consumer<KStream<SpecificAvroSerde<Key>, SpecificAvroSerde<Value>>> {
return Consumer { input -> input.foreach { key, value ->
println("============key = $key")
println("===========value = $value")
}}
}
}
fun main(args: Array<String>) {
runApplication<SpringBootKafkaConsumer>(*args)
}
应用程序.yml
spring:
application:
name: customer-balance
cloud:
stream:
kafka:
streams:
binder:
configuration:
application:
id: customer-balance-1
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
bindings:
process_in:
destination: "dbserver1.inventory.customers"
keySerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
nativeDecoding: true
startOffset: earliest
content-type: application/*+avro
logging:
level:
org.springframework.kafka.config: trace