2

我正在使用具有以下配置的 KafkaConnect - MongoSource:

curl -X PUT http://localhost:8083/connectors/mongo-source2/config -H "Content-Type: application/json" -d '{
  "name":"mongo-source2",
  "tasks.max":1,
  "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "value.converter":"org.apache.kafka.connect.storage.StringConverter",
  "connection.uri":"mongodb://xxx:xxx@localhost:27017/mydb",
  "database":"mydb",
  "collection":"claimmappingrules.66667777-8888-9999-0000-666677770000",
  "pipeline":"[{\"$addFields\": {\"something\":\"xxxx\"} }]",
  "transforms":"dropTopicPrefix",
  "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.dropTopicPrefix.regex":".*",
  "transforms.dropTopicPrefix.replacement":"my-topic"
}'

出于某种原因,当我使用消息时,我得到了一个奇怪的键:

 "_id": {
"_data": "825DFD2A53000000012B022C0100296E5A1004060C0FB7484A4990A7363EF5F662CF8D465A5F6964005A1003F9974744D06AFB498EF8D78370B0CD440004"
  }

我不知道它是从哪里来的,我的 mongo 文档的 _id 是 UUID,当我使用消息时,我应该在我的消费者密钥上看到 documentKey 字段。

以下是连接器发布到 kafka 的消息示例:

{
  "_id": {
    "_data": "825DFD2A53000000012B022C0100296E5A1004060C0FB7484A4990A7363EF5F662CF8D465A5F6964005A1003F9974744D06AFB498EF8D78370B0CD440004"
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1576872531,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$binary": "+ZdHRNBq+0mO+NeDcLDNRA==",
      "$type": "03"
    },
    ...
  },
  "ns": {
    "db": "security",
    "coll": "users"
  },
  "documentKey": {
    "_id": {
      "$binary": "+ZdHRNBq+0mO+NeDcLDNRA==",
      "$type": "03"
    }
  }
}
4

1 回答 1

0

与 Kafka 连接配置模式相关的文档确实有限。我知道现在回复为时已晚,但最近我也遇到了同样的问题,并通过反复试验找到了解决方案。

我将这两个配置添加到我的 mongodb-kafka-connect 配置中 -

"output.format.key": "schema",
"output.schema.key": "{\"name\":\"sampleId\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}",

但即使在此之后,我仍然不知道更改流的 resume_token 作为 kafka 分区分配的关键是否在性能方面具有任何意义,甚至对于由于长时间不活动而导致 resume_token 过期的情况。

PS - 我的 mongodb 作为源的 kafka 连接配置的最终版本是这样的 -

{
  "tasks.max": 1,
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "connection.uri": "mongodb://example-mongodb-0:27017,example-mongodb-1:27017,example-mongodb-2:27017/?replicaSet=replicaSet",
  "database": "exampleDB",
  "collection": "exampleCollection",
  "output.format.key": "schema",
  "output.schema.key": "{\"name\":\"ClassroomId\",\"type\":\"record\",\"namespace\":\"com.mongoexchange.avro\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}",
  "change.stream.full.document": "updateLookup",
  "copy.existing": "true",
  "topic.prefix": "mongodb"
}
于 2021-04-07T10:25:25.373 回答