5

我有一个已经在运行的生产部署的 Kafka-Cluster 并且主题为“ existing-topic ”。我正在使用来自 Debezium 的 MongoDB-Source-Connector。

在这里,我想要的只是将 CDC 事件直接推送到主题“ existing-topic ”,以便已经在收听该主题的消费者将处理它。

我没有找到任何资源可以这样做,但是提到该主题是按以下格式创建的-

“如果您的 mongodb.name 参数为 A,数据库名称为 B,集合名称为 C,则数据库 A 和集合 C 中的数据将加载到主题 ABC 下”

我可以将主题更改为“现有主题”并将事件推送到它吗?

4

2 回答 2

2

我在使用 JDBC Source Connector 时遇到了同样的问题,并找到了不同的解决方案:

使用RegexRouterSingle Message TransformdropPrefix可以覆盖整个主题名称:

"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name

它适用于正则表达式,因此如果您使用多个表/集合并且您创建的主题名称不是恒定的,您应该能够使其动态化。

这有点 hacky,因为从技术上讲,我要删除整个主题名称,然后添加一个新的主题名称——这对我来说不是最好的解决方案。

于 2020-07-30T16:43:12.030 回答
2

根据文件

Kafka 主题的名称始终采用 形式 logicalName.databaseName.collectionName,其中logicalName是 配置属性指定的连接器的逻辑名称,是发生操作的数据库的名称,是受影响文档所在的 MongoDB 集合的名称存在。mongodb.namedatabaseNamecollectionName


这意味着如果您的连接器的逻辑名称是myConnector并且您的数据库myDatabase有两个集合usersorders

{
  "name": "myConnector",  
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongo-db-host:27017", 
    "mongodb.name": "myDatabase", 
    "collection.whitelist": "myDatabase[.]*", 
  }
}

然后 Kafka Connect 将使用名称填充两个主题:

  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders

现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic应该对你有所帮助。请注意,尽管此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。

例如,以下 SMT 将提取字段的值myField并将其用作记录的主题:

 transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
 transforms.ValueFieldExample.field=myField
于 2020-05-17T11:47:13.270 回答