0

我按照以下步骤操作:

# Run postgres instance
docker run --name postgres -p 5000:5432 debezium/postgres

# Run zookeeper instance
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

# Run kafka instance
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

# Run kafka connect
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link postgres:postgres --link kafka:kafka debezium/connect

# Open psql console
psql -h localhost -p 5000 -U postgres
CREATE DATABASE inventory;
CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);


# Create connector using kafka connect
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "inventory",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}' | jq

# Verify created
curl -H "Accept:application/json" localhost:8083/connectors/ | jq

# Verify configuration
curl -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector | jq


# Start a console viewer on kafka

docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka watch-topic -a -k dbserver1.public.dumb_table
# Verify the existence of replication slot in postgres 
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn 
FROM pg_replication_slots;

但在这些步骤结束时,我只在 debezium 终端中看到错误日志消息。如何查看所有日志消息?

4

1 回答 1

0

打开 psql 控制台

psql -h localhost -p 5000 -U postgres

创建数据库库存;

在此语句之后,您需要连接到刚刚创建的库存数据库,因为 postgres 是默认数据库。

在 psql 提示符下键入:

\c inventory

这将连接到库存数据库。

连接后,您现在创建如下表:

CREATE TABLE dumb_table(id SERIAL PRIMARY KEY, name VARCHAR);

现在在同一个数据库下的同一张表上发出一些插入命令,即库存

INSERT INTO dumb_table VALUES (1, 'VALUE1');

您将能够在消费者控制台中看到更改。

在创建 debezium 连接器以侦听库存数据库的更改时。因此它只监听库存数据库下创建的表的更改(插入/更新/删除)。

于 2019-08-25T15:25:11.190 回答