1

我将 avro 架构与架构一起使用

    {
  "namespace": "me.escoffier.quarkus",
  "type": "record",
  "name": "Footballer",
  "fields": [
    {
      "name": "id",
      "type": ["null", "string"],
      "default": null
    },
    {
      "name": "name",
      "type": ["null","string"],
      "default": null
    },
    {
      "name": "club",
      "type": ["null","string"],
      "default": null
    }
  ]
}

在 Mongo 中,我的存在数据如下:

{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : "Messi", "club" : "Barca" }

当我发送一个 Avro 对象时:Footballer player = Footballer.newBuilder().setId("2").setClub("MU").build();

我的预期结果:

{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : "Messi", "club" : "MU" }

但实际上:

{ "_id" : ObjectId("60eeaeace1a32853aad7df0a"), "id" : "2", "name" : null, "club" : "MU" }

这是我的 kafka 连接器接收器配置:

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
document.id.strategy.overwrite.existing=true
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
topics=mongo-sink-test
collection=DeviceDB
value.converter.schema.registry.url=http://schema_registry:8081
database=omniVista
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=id
connection.uri=mongodb://root:rootpassword@mongo:27017
value.converter=io.confluent.connect.avro.AvroConverter
document.id.strategy.partial.value.projection.type=AllowList
key.converter=org.apache.kafka.connect.storage.StringConverter

所以,请帮我正确配置

4

1 回答 1

0

您已经使用空名称构建了一条新记录。

您应该验证这些属性是否符合您的要求,因为您似乎已将连接器配置为替换/覆盖具有相同 ID 的所有文档,而不是“更新”

document.id.strategy.overwrite.existing=true
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

免责声明:我没有使用 Mongo 连接器的经验

于 2021-07-15T19:42:03.940 回答