6

嗨,我正在无限循环中连续读取逻辑复制流。我有另一个程序不断填充同一个数据库中的表。我注意到一段时间后(大约 5-10 分钟)。我总是得到例外

org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
at com.datamirror.ts.scrapers.postgresqlscraper.PGStreamReceiver.main(PGStreamReceiver.java:60)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:220)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:67)
at org.postgresql.core.PGStream.receiveChar(PGStream.java:293)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1077)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1033)

这是我的示例程序:

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;


public class PGStreamReceiver {

public static void main(String[] args) {
    try {
        LogSequenceNumber startLsn = LogSequenceNumber.valueOf("0/2D8D0F0");
         String url = "jdbc:postgresql://localhost:5432/postgres"
                 ;
         String user ="postgres";
          String password = "xxxx";
          Properties connectionProperties = new Properties();

          PGProperty.USER.set(connectionProperties, user);
          PGProperty.PASSWORD.set(connectionProperties, password);
          PGProperty.ASSUME_MIN_SERVER_VERSION.set(connectionProperties, PostgreSQLConstants.MINIMUM_SUPPORTED_POSTGRESQL_VERSION);
          PGProperty.REPLICATION.set(connectionProperties, PostgreSQLConstants.POSTGRESQL_REPLICATION_PROPERTY_VALUE);
          PGProperty.PREFER_QUERY_MODE.set(connectionProperties, PostgreSQLConstants.POSTGRESQL_REPLICATION_PREFERRED_QUERY_MODE);
          Connection postgresSQLConnection = DriverManager.getConnection(url, connectionProperties);
            PGConnection postgrePGConnectionWrapper = postgresSQLConnection.unwrap(PGConnection.class);
            PGReplicationStream stream = postgrePGConnectionWrapper.
                     getReplicationAPI()
                     .replicationStream()
                     .logical()
                     .withSlotName("pvn")
                     .withStartPosition(startLsn)
                     .withSlotOption(PostgreSQLConstants.INCLUDE_XID_IN_STREAM_CHANGES, true)
                     .withSlotOption(PostgreSQLConstants.INCLUDE_TIMESTAMP_IN_STREAM_CHANGES, true)
                     .withSlotOption(PostgreSQLConstants.EXCLUDE_EMPTY_TRANSACTION_IN_CHANGES, true)
                     //.withStatusInterval(PostgreSQLConstants.SERVER_FEEDBACK_TIME_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
                     .withStatusInterval(60, TimeUnit.SECONDS)
                     .start();
            while(true)
            {
                ByteBuffer msg = stream.read();
                if(msg == null)
                {
                    return;
                }
                int offset = msg.arrayOffset();
                byte[] source = msg.array();
                int length = source.length - offset;
                String logData = new String(source, offset, length);
                System.out.print("1");
            }
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

任何帮助表示赞赏!我正在使用 postgres JDBC 驱动程序 42.2.2。

4

0 回答 0