我正在使用 mongo 源来监听 mongo 更改流并将所有事件放入 kafka,但我正在绞尽脑汁地寻找一种从事件中提取“Real”键的方法。我尝试了转换,但它没有用,给了我错误:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
在 Mongo 源代码中,我发现了这一行
这基本上意味着它甚至没有一些密钥处理,而是寻找“_id”字段(这不是文档的id,它是一个恢复令牌信息)
相反,我想将主题的键设置为“documentKey”。
以下是连接器获取的事件示例:
{
"_id": {
"_data": "DSAD45543FFWEHTEY004....."
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1446707990,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
},
...
},
"ns": {
"db": "somedb",
"coll": "somecol"
},
"documentKey": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
}
}
}
我使用了以下配置:
"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"
我试过了:
org.apache.kafka.connect.json.JsonConverter
还有 StringConverter (虽然我不认为这可以用字符串来完成)
org.apache.kafka.connect.storage.StringConverter
有没有办法提取密钥?请注意:模式已禁用。