我正在使用以下依赖项,
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<version.debezium>0.8.3.Final</version.debezium>
下面是我的java方法,
public void runMysqlParsser() {
Configuration config = Configuration.create()
/* begin engine properties */
.with("connector.class",
"io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage",
"org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename",
"/home/mohit/tmp/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin connector properties */
.with("name", "my-sql-connector")
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "root")
.with("database.password", "root")
.with("server.id", 1)
.with("database.server.name", "my-app-connector")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/home/mohit/tmp/dbhistory.dat")
.with("database.whitelist", "mysql")
.with("table.whitelist", "mysql.customers")
.build();
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleEvent)
.build();
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
private void handleEvent(SourceRecord sourceRecord) {
try {
LOG.info("Got record :" + sourceRecord.toString());
} catch (Exception ex) {
LOG.info("exception in handle event:" + ex);
}
我的 sql 配置,.
general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = row
binlog_row_image = full
binlog_rows_query_log_events = on
gtid_mode = on
enforce_gtid_consistency = on
当我运行这段代码时,我得到了历史日志的偏移量,mysql.log 文件也得到了添加到它的偏移量。但是,当我对表执行任何更新语句时,它没有给我任何日志,即没有调用 handleEvent 方法。谁能告诉我代码或配置有什么问题?
下面是运行java代码后的日志,
$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar
log4j:WARN 找不到记录器 (org.apache.kafka.connect.json.JsonConverterConfig) 的附加程序。log4j:WARN 请正确初始化 log4j 系统。
log4j:WARN 见http://logging.apache.org/log4j/1.2/faq.html#noconfig了解更多信息。2018 年 11 月 28 日下午 1:29:47 com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO:得到记录:SourceRecord{sourcePartition={server=my-app-connector},sourceOffset={file=mysql-bin.000002,pos= 980, gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, 快照=true}} ConnectRecord{topic='my-app-connector', kafkaPartition=0, key=Struct{databaseName=}, value=Struct {source=Struct{version=0.8.3.Final,name=my-app-connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true}, databaseName=,ddl=SET character_set_server=latin1, collation_server=latin1_swedish_ci;}, timestamp=null, headers=ConnectHeaders(headers=)} 2018 年 11 月 28 日下午 1:29:47 com.github.shyiko.mysql.binlog.BinaryLogClient 连接信息:在 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 连接到 localhost:3306 (sid:6326, cid:21)