13

我正在尝试从我的 PostgreSQL 数据库中获取某些表的更新流。获取所有更新的常规方式如下所示:

您创建一个逻辑复制槽

pg_create_logical_replication_slot('my_slot', 'wal2json');

并使用pg_recvlogical或进行特殊的 SQL 查询连接到它。这允许您以 json 格式从数据库中获取所有操作(如果您使用了 wal2json插件或类似插件),然后对这些数据执行任何您想要的操作。

但是在 PostgreSQL 10 中,我们有发布/订阅机制,它允许我们只复制选定的表。这非常方便,因为没有发送很多无用的数据。该过程如下所示:

首先,您创建一个出版物

CREATE PUBLICATION foo FOR TABLE herp, derp;

然后您从另一个数据库订阅该出版物

CREATE SUBSCRIPTION mysub CONNECTION <connection stuff> PUBLICATION foo;

这会在后台在主数据库上创建一个复制槽,并开始监听更新并将它们提交到第二个数据库上的相同表中。如果您的工作是复制一些表,但想为我的东西获取原始流,这很好。

正如我所提到的,CREATE SUBSCRIPTION查询是在后台的主数据库上创建一个复制槽,但是如何在没有订阅和第二个数据库的情况下手动创建一个?这里的文档说:

为了使这个工作,单独创建复制槽(使用函数 pg_create_logical_replication_slot 和插件名称 pgoutput)

根据文档,这是可能的,但pg_create_logical_replication_slot只会创建一个常规的复制槽。pgoutput插件负责所有的魔法吗?如果是,那么就无法使用其他插件wal2json,例如出版物。

我在这里想念什么?

4

2 回答 2

9

我在 Postgres 中的逻辑复制和逻辑解码经验有限,所以如果以下错误,请纠正我。话虽如此,这就是我发现的:

  1. 发布支持由pgoutput插件提供。您可以通过特定于插件的选项来使用它。可能是其他插件有可能加入支持,但不知道逻辑解码插件接口是否暴露了足够的细节。我针对wal2json插件版本对其进行了测试9e962ba,但它无法识别此选项。
  2. 复制槽是独立于发布创建的。在获取更改流时指定用作过滤器的发布。可以查看一个出版物的更改,然后查看另一个出版物的更改并观察不同的更改集,尽管使用了相同的复制槽(我没有发现它记录在案,我正在测试具有 Postgres 兼容性的 Aurora,因此行为可能会有所不同)。
  3. 插件输出似乎包括开始和提交的所有条目,即使事务没有触及感兴趣的发布中包含的任何表。但是,它不包括对出版物中未包含的其他表的更改。

这是一个如何在 Postgres 10+ 中使用它的示例:

-- Create publication
CREATE PUBLICATION cdc;

-- Create slot
SELECT pg_create_logical_replication_slot('test_slot_v1', 'pgoutput');

-- Create example table
CREATE TABLE replication_test_v1
(
  id integer NOT NULL PRIMARY KEY,
  name text
);

-- Add table to publication
ALTER PUBLICATION cdc ADD TABLE replication_test_v1;

-- Insert example data
INSERT INTO replication_test_v1(id, name) VALUES
  (1, 'Number 1')
;

-- Peak changes (does not consume changes)
SELECT pg_logical_slot_peek_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');

-- Get changes (consumes changes)
SELECT pg_logical_slot_get_binary_changes('test_slot_v1', NULL, NULL, 'publication_names', 'cdc', 'proto_version', '1');

要将 Postgres 中的更改流式传输到其他系统,您可以考虑使用Debezium项目。它是一个用于变更数据捕获的开源分布式平台,其中包括一个 PostgreSQL 连接器。在 0.10 版中,他们增加了对pgoutput插件的支持。即使您的用例与项目提供的非常不同,您也可以查看他们的代码以了解它们如何与复制 API 交互。

于 2019-08-19T19:39:02.080 回答
5

创建逻辑复制槽和发布后,您可以通过以下方式创建订阅:

CREATE SUBSCRIPTION mysub
       CONNECTION <conn stuff>
       PUBLICATION foo
       WITH (slot_name=my_slot, create_slot=false);

不确定这是否回答了您的问题。

于 2019-06-21T12:00:22.060 回答