0

逻辑复制是否解析事务单元中的 WAL 文件?回滚事务呢?

而且,什么 API 可以在接收器端输入数据更改,而无需在 SQL 级别重放它们?就像 postgresql 的内置流复制接收器一样,无论是逻辑的还是物理的。

编辑

让我进一步澄清我的问题。

逻辑流式流程如下图所示:

postgresql 实例(发送方,使用特定输出插件创建插槽)--------流式协议----------> postgresql 实例(接收方,获取复制数据)

这里复制数据的格式由输出插件决定,我们假设它是纯文本。那么直截了当,我们可以把它当做 SQL 语句,在接收方 postgresql 中重放,但显然效率低下。是否有任何低级 API 来输入复制数据?

4

1 回答 1

1

我花了一些时间来调查源代码,并尝试自己回答这个问题。

  • postgresql walsender 仅包括已提交的事务,忽略中止的事务。

相关的代码路径和代码片段:

pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() -> 
ReorderBufferCommit() -> ReorderBufferIterTXNNext()

解码XactOp():

switch (info)
{
    case XLOG_XACT_COMMIT:
    case XLOG_XACT_COMMIT_PREPARED:
        {
            xl_xact_commit *xlrec;
            xl_xact_parsed_commit parsed;
            TransactionId xid;

            xlrec = (xl_xact_commit *) XLogRecGetData(r);
            ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);

            if (!TransactionIdIsValid(parsed.twophase_xid))
                xid = XLogRecGetXid(r);
            else
                xid = parsed.twophase_xid;

            DecodeCommit(ctx, buf, &parsed, xid);
            break;
        }
  • 内置的逻辑应用工作者不会重播 SQL,而是使用许多内部 API 在接收方应用更改。

https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585

/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());

/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);

ExecOpenIndices(estate->es_result_relation_info, false);

/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);

/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();

PG10 的逻辑复制pgoutput作为输出插件,采用二进制格式,适合直接进入数据。

https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c

于 2018-09-06T19:02:33.837 回答