我正在尝试使用 Kafka 连接实时连接复制表。使用的数据库是 MySQLv5.7。在分别使用插入和更新模式时,列的行为符合预期。但是,当我使用 upsert 模式时,数据库中没有观察到任何变化。
通过 UI 填充的配置文件
下沉
topic = custom-p2p
Connector Class = JdbcSinkConnector
name = sink
tasks-max = 1
Key-converter-class=org.apache.kafka.connect.storage.StringConverter
Value-converter-class=org.apache.kafka.connect.json.JsonConverter
jdbc_url=jdbc:mysql://127.0.0.1:3306/p2p_service_db4?user=root&password=root&useSSL=false
insert mode = upsert
auto create = true
auto evolve = true
资源
Connector Class = JdbcSourceConnector
name = source-new
task max = 1
key converter class = org.apache.kafka.connect.storage.StringConverter
value converter class = org.apache.kafka.connect.json.JsonConverter
jdbc url = jdbc:mysql://127.0.0.1:3306/p2p_service_db3?user=root&password=root&useSSL=false
table loading mode = timestamp+incrementing
incrementing column name = auto_id
timestamp column name = last_updated_at
topic prefix = custom-
我遇到的问题是,当接收器插入模式更改为插入时,插入在更改为更新时正确进行,这也按预期完美发生,但是当值更改为 upsert 时,既不插入也不更新发生。请让我知道是否做错了什么?为什么这种模式不起作用?如果此插入和更新都需要在备份数据库中复制,是否有其他替代方法。
先感谢您。让我知道是否需要其他信息