我正在使用 MongoDB 源连接器设置 Kafka 连接器。
配置如下所示:
{
"name": "MongoSourceConn",
"config": {
"name": "MongoSourceConn",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url":"http://schema-registry:8081",
"publish.full.document.only": true,
"topics": "test_topic",
"connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
"database": "kafka",
"collection": "test_topic",
"pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"kafka.test_topic",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement":"test_topic"
如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。
如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。我错过了什么?这是我们 mongo 中的文档的样子:
{
"_id" : ObjectId("61570b1d21589e03f8011235"),
"jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
"jobStatus" : 5,
"data" : null,
"createdOn" : "2021-10-01 13:20:29.215691"
}
配置正在通过 rest api 推送,这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。
谢谢。