我正在使用 Debezium (0.7.5) MySQL 连接器,如果我想使用选项更新此配置,我试图了解什么是最佳方法table.whitelist
。
假设我创建了一个连接器,如下所示:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
"name": "MyConnector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"connect.timeout.ms": "60000",
"tasks.max": "1",
"database.hostname": "myhost",
"database.port": "3306",
"database.user": "***",
"database.password": "***",
"database.server.id": "3227197",
"database.server.name": "MyServer",
"database.whitelist": "myDb",
"table.whitelist": "myDb.table1,myDb.table2",
"database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
"database.history.kafka.topic": "MyConnectorHistoryTopic",
"max.batch.size": "1024",
"snapshot.mode": "initial",
"decimal.handling.mode": "double"
}
}'
一段时间后(2 周),我需要在myDb.table3
这个选项中添加一个新表()table.whitelist
(这个表是旧表,它是在连接器之前创建的)
我尝试的是:
- 暂停连接器。
- 删除了历史主题(也许这是问题所在?)。
- 通过 API 更新配置端点更新配置。
- 恢复连接器。
通过 API 更新命令:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"connect.timeout.ms": "60000",
"tasks.max": "1",
"database.hostname": "myhost",
"database.port": "3306",
"database.user": "***",
"database.password": "***",
"database.server.id": "3227197",
"database.server.name": "MyServer",
"database.whitelist": "myDb",
"table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
"database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
"database.history.kafka.topic": "MyConnectorHistoryTopic",
"max.batch.size": "1024",
"snapshot.mode": "schema_only",
"decimal.handling.mode": "double"
}'
但它没有用,也许这根本不是最好的方法。在其他连接器中,我没有使用 option table.whitelist
,所以当我需要收听新表时,我没有这个问题。
我的最后一个选择是删除此连接器并使用此新配置创建另一个连接器,同时监听新表(myDb.table3
)。myDb.table3
问题是,如果我想要来自我必须使用快照创建的初始数据,initial
但我不想从其他表的快照中生成所有消息myDb.table1,myDb.table2
。