我将 avro 架构与架构一起使用
{
"namespace": "me.escoffier.quarkus",
"type": "record",
"name": "Footballer",
"fields": [
{
"name": "id",
"type": ["null", "string"],
"default": null
},
{
"name": "name",
"type": ["null","string"],
"default": null
},
{
"name": "club",
"type": ["null","string"],
"default": null
}
]
}
在 Mongo 中,我的存在数据如下:
{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : "Messi", "club" : "Barca" }
当我发送一个 Avro 对象时:Footballer player = Footballer.newBuilder().setId("2").setClub("MU").build();
我的预期结果:
{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : "Messi", "club" : "MU" }
但实际上:
{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : null, "club" : "MU" }
这是我的 kafka 连接器接收器配置:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
document.id.strategy.overwrite.existing=true
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
topics=mongo-sink-test
collection=DeviceDB
value.converter.schema.registry.url=http://schema_registry:8081
database=omniVista
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=id
connection.uri=mongodb://root:rootpassword@mongo:27017
value.converter=io.confluent.connect.avro.AvroConverter
document.id.strategy.partial.value.projection.type=AllowList
key.converter=org.apache.kafka.connect.storage.StringConverter
所以,请帮我正确配置