根据文件,
Kafka 主题的名称始终采用 形式
logicalName.databaseName.collectionName
,其中logicalName
是
配置属性指定的连接器的逻辑名称,是发生操作的数据库的名称,是受影响文档所在的 MongoDB 集合的名称存在。mongodb.name
databaseName
collectionName
这意味着如果您的连接器的逻辑名称是myConnector
并且您的数据库myDatabase
有两个集合users
和orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
然后 Kafka Connect 将使用名称填充两个主题:
myConnector.myDatabase.users
myConnector.myDatabase.orders
现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic
应该对你有所帮助。请注意,尽管此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。
例如,以下 SMT 将提取字段的值myField
并将其用作记录的主题:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField