6

我正在使用 Debezium (0.7.5) MySQL 连接器,如果我想使用选项更新此配置,我试图了解什么是最佳方法table.whitelist

假设我创建了一个连接器,如下所示:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

一段时间后(2 周),我需要在myDb.table3这个选项中添加一个新表()table.whitelist(这个表是旧表,它是在连接器之前创建的)

我尝试的是:

  • 暂停连接器。
  • 删除了历史主题(也许这是问题所在?)。
  • 通过 API 更新配置端点更新配置。
  • 恢复连接器。

通过 API 更新命令:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

但它没有用,也许这根本不是最好的方法。在其他连接器中,我没有使用 option table.whitelist,所以当我需要收听新表时,我没有这个问题。

我的最后一个选择是删除此连接器并使用此新配置创建另一个连接器,同时监听新表(myDb.table3)。myDb.table3问题是,如果我想要来自我必须使用快照创建的初始数据,initial但我不想从其他表的快照中生成所有消息myDb.table1,myDb.table2

4

3 回答 3

3

目前尚不支持更改白名单/黑名单配置。目前正在处理此问题(请参阅DBZ-175),我们希望在下一个版本中对此提供预览支持。对此有一个待定的 PR,不过需要更多的工作。

在此实现之前,您最好的选择是设置一个新的连接器实例,它只捕获您感兴趣的其他表。这是以运行两个连接器为代价的(它们都将维护一个 binlog 阅读器会话) ,但只要您不需要太频繁地更改过滤器配置,它就可以解决问题。

于 2018-11-30T09:08:23.953 回答
2

Debezium Server 最新版本,可以添加如下配置

debezium.snapshot.new.tables=parallel

如果你使用 Debezium,你可以试试这个配置值

snapshot.new.tables=parallel

注意:Debeziyum 服务器是支持 Kinesis、Google Pub sub 和 Apache Pulsar 的服务器。我正在使用它,它的配置有点不同。我必须在每个项目之前添加“debezium”

添加此配置后,任何添加到 tables.whitelist,对于这些额外的表,Debezium 都会创建快照。

我无法将您指向文档,但我在 GitHub 中浏览了他们的代码,并且我实际尝试了它对我有用。这是 MySqlConnector 代码的链接

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

在那里搜索 Field.create("snapshot.new.tables")

就个人而言,我觉得 Debezium 有很多东西,但文档很分散。

于 2021-01-05T13:34:37.970 回答
0

我有同样的问题,并用 debezium 的信号表解决。它以这种方式工作,您必须创建一个表以发送到数据表中的 debezium 命令。

CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

并在你的配置中设置做 debzium 一个标签"signal.data.collection": "public.debezium_signal"

之后,您可以在该表中使用 insert 发送命令:

INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','{"data-collections": "myDb.table3"]}');

在我的情况下,我必须在 table.include.list 和 column.include.list 中的列中添加 de table 信号。

https://debezium.io/documentation/reference/stable/configuration/signalling.html

于 2022-02-03T18:50:47.483 回答