48

如何使用 Go 的 database/sql 包批处理 sql 语句?

在Java中我会这样做:

// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);

// Insert 10 rows of data
for (int i=0; i<10; i++) {
    pstmt.setString(1, ""+i);
    pstmt.addBatch();
}

// Execute the batch
int [] updateCounts = pstmt.executeBatch();

我如何在 Go 中实现相同的目标?

4

12 回答 12

85

由于该db.Exec函数是variadic,因此一种选择(实际上只进行一次网络往返)是自己构造语句并分解参数并将它们传入。

示例代码:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", 
                        strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

在我运行的一个简单测试中,该解决方案在插入 10,000 行时比另一个答案中提出的 Begin、Prepare、Commit 快了大约 4 倍——尽管实际改进很大程度上取决于您的个人设置、网络延迟等。

于 2014-08-07T21:03:36.963 回答
17

如果您使用的是 PostgreSQL,那么pq支持批量导入

于 2015-04-16T15:04:42.350 回答
16

为不支持占位符的 PostgreSQL调整安德鲁的解决方案?,以下工作:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}
于 2018-01-03T02:25:38.960 回答
7

扩展 Avi Flax 的答案,我需要在我的 INSERT 中添加一个 ON CONFLICT DO UPDATE 子句。

解决这个问题的方法是 COPY 到一个临时表(设置为在事务结束时删除),然后从临时表 INSERT 到永久表。

这是我确定的代码:

func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
    tx, err := fdata.db.Begin()
    if err != nil {
        return errors.Wrap(err, "begin transaction")
    }
    txOK := false
    defer func() {
        if !txOK {
            tx.Rollback()
        }
    }()

    // The ON COMMIT DROP clause at the end makes sure that the table
    // is cleaned up at the end of the transaction.
    // While the "for{..} state machine" goroutine in charge of delayed
    // saving ensures this function is not running twice at any given time.
    _, err = tx.Exec(sqlFDataMakeTempTable)
    // CREATE TEMPORARY TABLE fstore_data_load
    // (map text NOT NULL, key text NOT NULL, data json)
    // ON COMMIT DROP
    if err != nil {
        return errors.Wrap(err, "create temporary table")
    }

    stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
    for key, val := range items {
        _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
        if err != nil {
            return errors.Wrap(err, "loading COPY data")
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return errors.Wrap(err, "flush COPY data")
    }
    err = stmt.Close()
    if err != nil {
        return errors.Wrap(err, "close COPY stmt")
    }

    _, err = tx.Exec(sqlFDataSetFromTemp)
    // INSERT INTO fstore_data (map, key, data)
    // SELECT map, key, data FROM fstore_data_load
    // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
    if err != nil {
        return errors.Wrap(err, "move from temporary to real table")
    }

    err = tx.Commit()
    if err != nil {
        return errors.Wrap(err, "commit transaction")
    }
    txOK = true
    return nil
}
于 2017-01-15T18:21:32.487 回答
4

如果您使用 Postgres,这里是@Debasish Mitra的解决方案。

运行示例: https: //play.golang.org/p/dFFD2MrEy3J

替代示例: https: //play.golang.org/p/vUtW0K4jVMd

data := []Person{{"John", "Doe", 27}, {"Leeroy", "Jenkins", 19}}

vals := []interface{}{}
for _, row := range data {
    vals = append(vals, row.FirstName, row.LastName, row.Age)
}

sqlStr := `INSERT INTO test(column1, column2, column3) VALUES %s`
sqlStr = ReplaceSQL(sqlStr, "(?, ?, ?)", len(data))

//Prepare and execute the statement
stmt, _ := db.Prepare(sqlStr)
res, _ := stmt.Exec(vals...)

函数替换SQL

func ReplaceSQL(stmt, pattern string, len int) string {
    pattern += ","
    stmt = fmt.Sprintf(stmt, strings.Repeat(pattern, len))
    n := 0
    for strings.IndexByte(stmt, '?') != -1 {
        n++
        param := "$" + strconv.Itoa(n)
        stmt = strings.Replace(stmt, "?", param, 1)
    }
    return strings.TrimSuffix(stmt, ",")
}
于 2019-03-11T00:27:04.237 回答
3

如果有人使用 pgx(Golang 中所谓的最佳 Postgres 驱动程序),请参阅此解决方案: https ://github.com/jackc/pgx/issues/764#issuecomment-685249471

于 2020-09-02T03:46:33.037 回答
2

我让 pq.CopyIn 工作了,它实际上比字符串值/参数方法快 2.4 倍(顺便说一句,这是一个非常有用和优雅的解决方案,所以谢谢!)

我将 1000 万个 int、varchar 测试值插入到一个结构中,并使用以下函数加载它。我对 GoLang 有点陌生,所以请耐心等待...

func copyData(client *client.DbClient, dataModels []*dataModel) error{
    db := *client.DB
    txn, err := db.Begin()
    if err != nil {
        return err
    }
    defer txn.Commit()

    stmt, err := txn.Prepare(pq.CopyIn("_temp", "a", "b"))
    if err != nil {
        return(err)
    }

    for _, model := range dataModels{
        _, err := stmt.Exec(model.a, model.b)
        if err != nil {
            txn.Rollback()
            return err
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return err
    }

    err = stmt.Close()
    if err != nil {
        return err
    }

    return nil
    }

`

经过(字符串值/参数):1m30.60s。

经过(copyIn):37.57s。

于 2020-05-28T20:51:57.583 回答
2

对于 Postgres lib pq 支持批量插入:https ://godoc.org/github.com/lib/pq#hdr-Bulk_imports

但是同样可以通过下面的代码实现,但真正有用的是尝试执行批量条件更新(相应地更改查询)。

要为 Postgres 执行类似的批量插入,您可以使用以下函数。

// ReplaceSQL replaces the instance occurrence of any string pattern with an increasing $n based sequence
func ReplaceSQL(old, searchPattern string) string {
   tmpCount := strings.Count(old, searchPattern)
   for m := 1; m <= tmpCount; m++ {
      old = strings.Replace(old, searchPattern, "$"+strconv.Itoa(m), 1)
   }
   return old
}

所以上面的样本变成了

sqlStr := "INSERT INTO test(n1, n2, n3) VALUES "
vals := []interface{}{}

for _, row := range data {
   sqlStr += "(?, ?, ?)," // Put "?" symbol equal to number of columns
   vals = append(vals, row["v1"], row["v2"], row["v3"]) // Put row["v{n}"] blocks equal to number of columns
}

//trim the last ,
sqlStr = strings.TrimSuffix(sqlStr, ",")

//Replacing ? with $n for postgres
sqlStr = ReplaceSQL(sqlStr, "?")

//prepare the statement
stmt, _ := db.Prepare(sqlStr)

//format all vals at once
res, _ := stmt.Exec(vals...)
于 2019-03-07T08:10:41.010 回答
1

无法通过 database/sql 中可用的接口进行批处理。但是,特定的数据库驱动程序可能会单独支持它。例如https://github.com/ziutek/mymysql似乎支持使用 MySQL 进行批处理。

于 2012-09-19T12:13:19.863 回答
1

采用 Andrew C 的想法,并使用 sql 标量变量来适应我的工作需要。它非常适合我工作中的特定要求。也许它对某人有用,因为它在 golang 中模拟 sql 的批处理事务很有用。这就是想法。

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("(@p%d, @p%d, @p%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    sqlQuery := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))

    var params []interface{}

    for i := 0; i < len(valueArgs); i++ {
        var param sql.NamedArg
        param.Name = fmt.Sprintf("p%v", i+1)
        param.Value = valueArgs[i]
        params = append(params, param)
    }

    _, err := db.Exec(sqlQuery, params...)
    return err
}
于 2018-10-06T00:52:45.523 回答
0

使用链式语法查看的另一个好库是go-pg

https://github.com/go-pg/pg/wiki/Writing-Queries#insert

使用单个查询插入多本书:

err := db.Model(book1, book2).Insert()
于 2019-07-24T10:27:27.320 回答
0

这是一个更通用的版本,用于根据@andrew-c 和@mastercarl 的答案生成查询和值参数:

// 批量/insert.go

import (
    "strconv"
    "strings"
)

type ValueExtractor = func(int) []interface{}

func Generate(tableName string, columns []string, numRows int, postgres bool, valueExtractor ValueExtractor) (string, []interface{}) {
    numCols := len(columns)
    var queryBuilder strings.Builder
    queryBuilder.WriteString("INSERT INTO ")
    queryBuilder.WriteString(tableName)
    queryBuilder.WriteString("(")
    for i, column := range columns {
        queryBuilder.WriteString("\"")
        queryBuilder.WriteString(column)
        queryBuilder.WriteString("\"")
        if i < numCols-1 {
            queryBuilder.WriteString(",")
        }
    }
    queryBuilder.WriteString(") VALUES ")
    var valueArgs []interface{}
    valueArgs = make([]interface{}, 0, numRows*numCols)
    for rowIndex := 0; rowIndex < numRows; rowIndex++ {
        queryBuilder.WriteString("(")
        for colIndex := 0; colIndex < numCols; colIndex++ {
            if postgres {
                queryBuilder.WriteString("$")
                queryBuilder.WriteString(strconv.Itoa(rowIndex*numCols + colIndex + 1))
            } else {
                queryBuilder.WriteString("?")
            }
            if colIndex < numCols-1 {
                queryBuilder.WriteString(",")
            }
        }
        queryBuilder.WriteString(")")
        if rowIndex < numRows-1 {
            queryBuilder.WriteString(",")
        }
        valueArgs = append(valueArgs, valueExtractor(rowIndex)...)
    }
    return queryBuilder.String(), valueArgs
}

// 批量/insert_test.go

import (
    "fmt"
    "strconv"
)

func valueExtractor(index int) []interface{} {
    return []interface{}{
        "trx-" + strconv.Itoa(index),
        "name-" + strconv.Itoa(index),
        index,
    }
}

func ExampleGeneratePostgres() {
    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, true, valueExtractor)
    fmt.Println(query)
    fmt.Println(valueArgs)
    // Output:
    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES ($1,$2,$3),($4,$5,$6),($7,$8,$9)
    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
}

func ExampleGenerateOthers() {
    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, false, valueExtractor)
    fmt.Println(query)
    fmt.Println(valueArgs)
    // Output:
    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES (?,?,?),(?,?,?),(?,?,?)
    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
}
于 2019-11-04T15:43:54.280 回答