1

我正在使用pgx库在 Go 中使用 Postgres 逻辑复制,以通过使用 wal2json 作为输出插件的逻辑复制槽获取数据库更改。正在使用的 Postgres 版本是v10.1.

WaitForReplicationMessage循环中,我收到一条消息,如果该消息是 a ServerHeartbeat,我发送一条备用状态消息以更新我在 WAL 中消费位置的服务器。此StandbyStatus消息有一个字段,称为ReplyRequestedwhich,如果等于,则1告诉服务器发送一个ServerHeartbeat; 如果价值是0它不应该做任何事情。

现在我正在发送一条StandbyStatus消息,其ReplyRequested值为 to 0(这是创建对象时的默认值)。在发送此消息时,服务器会发送心跳消息,尽管我告诉它不要这样做。我无法看到这个问题的原因。

这是我的代码:

    for {
    log.Info("Waiting for message")

    message, err := session.ReplConn.WaitForReplicationMessage(context.TODO())
    if err != nil {
        log.WithError(err).Errorf("%s", reflect.TypeOf(err))
        continue
    }

    if message.WalMessage != nil {
        log.Info(string(message.WalMessage.WalData))
    } else if message.ServerHeartbeat != nil {
        log.Info("Heartbeat requested")
        // set the flushed LSN (and other LSN values) in the standby status and send to PG
        log.Info(message.ServerHeartbeat)

        // send Standby Status with the LSN position            
        err = session.sendStandbyStatus()
        if err != nil {
            log.WithError(err).Error("Unable to send standby status")
        }

    }
}

上面的sendStandbyStatus函数是:

func (session *Session) sendStandbyStatus() error {
  standbyStatus, err := pgx.NewStandbyStatus(session.RestartLSN)
  if err != nil {
      return err
  }
  log.Info(standbyStatus) // the output of this confirms ReplyRequested is indeed 0
  standbyStatus.ReplyRequested = 0 // still set it
  err = session.ReplConn.SendStandbyStatus(standbyStatus)
  if err != nil {
      return err
  }
  return nil
}
4

0 回答 0