我正在使用pgx库在 Go 中使用 Postgres 逻辑复制,以通过使用 wal2json 作为输出插件的逻辑复制槽获取数据库更改。正在使用的 Postgres 版本是v10.1
.
在WaitForReplicationMessage
循环中,我收到一条消息,如果该消息是 a ServerHeartbeat
,我发送一条备用状态消息以更新我在 WAL 中消费位置的服务器。此StandbyStatus
消息有一个字段,称为ReplyRequested
which,如果等于,则1
告诉服务器发送一个ServerHeartbeat
; 如果价值是0
它不应该做任何事情。
现在我正在发送一条StandbyStatus
消息,其ReplyRequested
值为 to 0
(这是创建对象时的默认值)。在发送此消息时,服务器会发送心跳消息,尽管我告诉它不要这样做。我无法看到这个问题的原因。
这是我的代码:
for {
log.Info("Waiting for message")
message, err := session.ReplConn.WaitForReplicationMessage(context.TODO())
if err != nil {
log.WithError(err).Errorf("%s", reflect.TypeOf(err))
continue
}
if message.WalMessage != nil {
log.Info(string(message.WalMessage.WalData))
} else if message.ServerHeartbeat != nil {
log.Info("Heartbeat requested")
// set the flushed LSN (and other LSN values) in the standby status and send to PG
log.Info(message.ServerHeartbeat)
// send Standby Status with the LSN position
err = session.sendStandbyStatus()
if err != nil {
log.WithError(err).Error("Unable to send standby status")
}
}
}
上面的sendStandbyStatus
函数是:
func (session *Session) sendStandbyStatus() error {
standbyStatus, err := pgx.NewStandbyStatus(session.RestartLSN)
if err != nil {
return err
}
log.Info(standbyStatus) // the output of this confirms ReplyRequested is indeed 0
standbyStatus.ReplyRequested = 0 // still set it
err = session.ReplConn.SendStandbyStatus(standbyStatus)
if err != nil {
return err
}
return nil
}