1

我需要使用新转换的 ID(作为“记录”传入)更新一个相当大的 psql 表。我在下面创建了这个函数来利用 pgxpool 的连接池和请求批处理,如果我使用不同的客户端应用这些事务,它们会更新数据库,如果我查看打印出来的结果,它们表明每次有 1 行受到影响,但是当我从数据库中检索这些行,它们保持不变。我是否以某种方式错误地使用了批处理?

import (
    ...
    "github.com/jackc/pgconn"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"

    log "github.com/sirupsen/logrus"
)

func SetNewSubscriptionValuesBatch(dsn string, records map[string]string) error {
    
    var db *pgxpool.Pool

    db, err := pgxpool.Connect(context.Background(), dsn)
    defer db.Close()
    if err != nil {
        panic(err)
    }
    
    tx, err := db.Begin(context.Background())
    if err != nil {
        panic(err)
    }

    b := &pgx.Batch{}

    for id, subID := range records {

    sqlStatement := `
UPDATE event
SET subscription_id_2 = $2
WHERE id = $1;`
        b.Queue(sqlStatement, id, subID)
    }
    
    batchResults := tx.SendBatch(context.Background(), b)
    
    var berr error
    var result pgconn.CommandTag
    for berr == nil {
        result, berr = batchResults.Exec()
        log.WithField("result", result.String()).WithField("result.rows-affected", result.RowsAffected()).Info("batch-result")
    }
    return nil
}

打印出:

{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"UPDATE 1","result.rows-affected":1,"time":"2020-10-14T16:47:25+01:00"}
{"level":"info","msg":"batch-result","result":"","result.rows-affected":0,"time":"2020-10-14T16:47:25+01:00"}
4

1 回答 1

2

根据@Adrian Klaver 的建议,我进行了这些更改以完成交易:

  1. 事务仍然需要明确提交

  2. 在 SendBatch 之后,事务连接将保持打开状态。在使用连接进行提交之前,可能需要释放这些。

以下是更改后的代码:

import (
    ...
    "github.com/jackc/pgconn"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"

    log "github.com/sirupsen/logrus"
)

func SetNewSubscriptionValuesBatch(dsn string, records map[string]string) error {

    ctx := context.Background()
    var db *pgxpool.Pool

    db, err := pgxpool.Connect(ctx, dsn)
    defer db.Close()
    if err != nil {
        panic(err)
    }
    
    tx, err := db.Begin(ctx)
    if err != nil {
        panic(err)
    }

    b := &pgx.Batch{}

    for id, subID := range records {

    sqlStatement := `
UPDATE event
SET subscription_id_2 = $2
WHERE id = $1;`
        b.Queue(sqlStatement, id, subID)
    }
    
    batchResults := tx.SendBatch(ctx, b)
    
    var qerr error
    var rows pgx.Rows
    for qerr == nil {
        rows, qerr = batchResults.Query()
        rows.Close()
    }
    
    return tx.Commit(ctx)
}
于 2020-10-15T09:49:10.590 回答