0

我正在使用 MongoDB 源连接器设置 Kafka 连接器。

配置如下所示:

{
  "name": "MongoSourceConn",
  "config": {
    "name": "MongoSourceConn",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "publish.full.document.only": true,
    "topics": "test_topic",
    "connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
    "database": "kafka",
    "collection": "test_topic",
    "pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
    "transforms":"dropPrefix",
    "transforms.dropPrefix.regex":"kafka.test_topic",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.replacement":"test_topic"

如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。

如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。我错过了什么?这是我们 mongo 中的文档的样子:

{
    "_id" : ObjectId("61570b1d21589e03f8011235"),
    "jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
    "jobStatus" : 5,
    "data" : null,
    "createdOn" : "2021-10-01 13:20:29.215691"
}

配置正在通过 rest api 推送,这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。

谢谢。

4

1 回答 1

0

很明显,这条管道永远不会匹配,因为它当前包含{\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}

您提到您删除了它,但没有看到更多内容,无法确切知道您是如何删除它的,所以那里可能出了点问题。

此外,还不清楚数据在获得后的确切外观。您在 Mongo 中显示一条消息,但可能会被包装到其他内容中(例如,由于变更流),因此字段 jobStatus 可能在顶层可能不可用,但最终嵌套。

我会推荐这些步骤:

  1. 在没有管道的情况下检查您的数据在 kafka 中的外观
  2. 从只做一件事的最简单的管道开始
  3. 玩这个,直到你能够以某种方式使用管道
  4. 然后继续扩展逻辑,直到你回到你想要的

我知道这些步骤有点笼统,但加上上面指出的内容,希望就足够了。

于 2021-10-09T19:51:37.940 回答