2

我正在使用 mongo 源来监听 mongo 更改流并将所有事件放入 kafka,但我正在绞尽脑汁地寻找一种从事件中提取“Real”键的方法。我尝试了转换,但它没有用,给了我错误:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String

在 Mongo 源代码中,我发现了这一

这基本上意味着它甚至没有一些密钥处理,而是寻找“_id”字段(这不是文档的id,它是一个恢复令牌信息)

相反,我想将主题的键设置为“documentKey”。

以下是连接器获取的事件示例:

{
 "_id": {
    "_data": "DSAD45543FFWEHTEY004....."
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1446707990,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    },
    ...
  },
  "ns": {
    "db": "somedb",
    "coll": "somecol"
  },
  "documentKey": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    }
  }
}

我使用了以下配置:

"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"

我试过了:

org.apache.kafka.connect.json.JsonConverter

还有 StringConverter (虽然我不认为这可以用字符串来完成)

org.apache.kafka.connect.storage.StringConverter

有没有办法提取密钥?请注意:模式已禁用。

4

2 回答 2

1

这是因为 MongoDB Source Connector for Kafka 还不支持它。从 1.3 版开始,它应该支持高级密钥选择。

https://jira.mongodb.org/browse/KAFKA-40

于 2020-07-13T10:17:40.740 回答
0

请注意:架构已禁用

在这种情况下,您不能使用 ValueToKey 转换。但是,即使您可以,该转换也不支持有效负载中的嵌套值,在您的情况下,这类似于documentKey._id.$binary

于 2019-12-25T05:27:33.470 回答