1

我正在使用 Debezium 运行读取二进制日志,但是当我启动新的读取线程时,它会从头开始读取表的所有创建语句,但我不需要它们(op=c)。我需要处理第一次运行代码后发生的创建/更新/删除事件。而不是使用正确的偏移量(存储在文件“tmp/offsets.dat”中),那么我如何以这种方式设置初始配置?所以流程需要是下一个:

  • 开始阅读(第一次)-> 获取当前(binlog 中的最新位置并保存,从这里开始工作)并处理最新事件
  • 开始阅读(不是第一次运行)-> 从文件中获取最新位置并照常读取数据

这是我目前的配置

  config = Configuration.empty().withSystemProperties(Function.identity()).edit()
            .with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
            .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "r")
            .with(MySqlConnectorConfig.HOSTNAME, HOSTNAME)
            .with(MySqlConnectorConfig.PORT, PORT)
            .with(MySqlConnectorConfig.USER, USER)
            .with(MySqlConnectorConfig.PASSWORD, PASSWORD)
            .with(MySqlConnectorConfig.TABLE_WHITELIST, TABLE_WHITELIST)
            .with(MySqlConnectorConfig.SERVER_ID, 100)
            // 
            .with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "tmp/offsets.dat")
            .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
            .with(EmbeddedEngine.ENGINE_NAME, SERVER_NAME)
            //
            .with(MySqlConnectorConfig.DATABASE_HISTORY, "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename", "tmp/dbhistory.dat")
            // Send JSON without schema
            .with("schemas.enable", false)
            .build();

my.cnf二进制日志的值

[mysqld]
log-bin=mysql-bin.log
server_id=100
binlog_row_image=full
binlog-format=row
expire_logs_days =10
4

0 回答 0