346

A very frequently asked question here is how to do an upsert, which is what MySQL calls INSERT ... ON DUPLICATE UPDATE and the standard supports as part of the MERGE operation.

Given that PostgreSQL doesn't support it directly (before pg 9.5), how do you do this? Consider the following:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Now imagine that you want to "upsert" the tuples (2, 'Joe'), (3, 'Alan'), so the new table contents would be:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

That's what people are talking about when discussing an upsert. Crucially, any approach must be safe in the presence of multiple transactions working on the same table - either by using explicit locking, or otherwise defending against the resulting race conditions.

This topic is discussed extensively at Insert, on duplicate update in PostgreSQL?, but that's about alternatives to the MySQL syntax, and it's grown a fair bit of unrelated detail over time. I'm working on definitive answers.

These techniques are also useful for "insert if not exists, otherwise do nothing", i.e. "insert ... on duplicate key ignore".

4

6 回答 6

478

9.5 及更新版本:

PostgreSQL 9.5 和更新的支持INSERT ... ON CONFLICT (key) DO UPDATE(和ON CONFLICT (key) DO NOTHING),即 upsert。

与 比较ON DUPLICATE KEY UPDATE

快速解释

有关用法,请参阅手册- 特别是语法图中的冲突操作子句和解释性文本

与下面给出的 9.4 及更早版本的解决方案不同,此功能适用于多个冲突行,并且不需要排他锁定或重试循环。

添加该功能的提交在此处围绕其开发的讨论在此处


如果您使用的是 9.5 并且不需要向后兼容,您现在可以停止阅读


9.4 及以上版本:

PostgreSQL 没有任何内置UPSERT(或MERGE)工具,在并发使用的情况下有效地做到这一点非常困难。

本文详细讨论了这个问题

通常,您必须在两个选项之间进行选择:

  • 重试循环中的单独插入/更新操作;或者
  • 锁定表并进行批量合并

单行重试循环

如果您希望多个连接同时尝试执行插入,则在重试循环中使用单独的行 upsert 是合理的选择。

PostgreSQL 文档包含一个有用的过程,可让您在数据库内循环执行此操作。与大多数幼稚的解决方案不同,它可以防止丢失更新和插入竞争。不过,它只会在READ COMMITTED模式下工作,并且只有当它是您在事务中唯一做的事情时才是安全的。如果触发器或辅助唯一键导致唯一违规,该功能将无法正常工作。

这种策略非常低效。只要可行,您应该排队工作并按照如下所述进行批量更新。

许多尝试解决此问题的方法都没有考虑回滚,因此导致更新不完整。两笔交易相互竞争;其中之一成功INSERT;另一个得到一个重复的键错误并UPDATE改为执行。UPDATE等待INSERT回滚或提交的块。当它回滚时,UPDATE条件重新检查匹配零行,因此即使UPDATE提交实际上并没有完成您预期的 upsert。您必须检查结果行数并在必要时重试。

一些尝试的解决方案也未能考虑 SELECT 比赛。如果您尝试明显而简单的方法:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

然后当两个同时运行时,会出现几种故障模式。一个是已经讨论过的更新重新检查问题。另一个是同时UPDATE匹配零行并继续。然后他们都做EXISTS测试,这发生INSERT. 两者都得到零行,所以都做INSERT. 一个因重复键错误而失败。

这就是您需要重试循环的原因。您可能认为可以使用巧妙的 SQL 防止重复键错误或丢失更新,但事实并非如此。您需要检查行数或处理重复的键错误(取决于选择的方法)并重试。

请不要为此推出自己的解决方案。就像消息队列一样,它可能是错误的。

带锁的批量更新插入

有时你想做一个批量更新,你有一个新的数据集,你想合并到一个旧的现有数据集中。这比单独的行upserts效率高得多,并且应该在可行时首选。

在这种情况下,您通常遵循以下过程:

  • CREATE一张TEMPORARY桌子

  • COPY或将新数据批量插入临时表

  • LOCK目标表IN EXCLUSIVE MODE。这允许其他事务对SELECT表进行任何更改,但不能对其进行任何更改。

  • 使用临时表中的值执行UPDATE ... FROM现有记录;

  • 执行INSERT目标表中尚不存在的行;

  • COMMIT, 释放锁。

例如,对于问题中给出的示例,使用多值INSERT填充临时表:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

相关阅读

怎么样MERGE

SQL 标准MERGE实际上具有定义不明确的并发语义,并且不适合在不先锁定表的情况下进行更新插入。

对于数据合并来说,这是一个非常有用的 OLAP 语句,但对于并发安全的 upsert,它实际上并不是一个有用的解决方案。对于使用其他 DBMSMERGE进行 upserts 的人有很多建议,但实际上是错误的。

其他数据库:

于 2013-06-24T02:57:02.627 回答
50

以下是insert ... on conflict ...( pg 9.5+ ) 的一些示例:

  • 插入,冲突 -什么都不做
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
    
  • 插入,在冲突时更新,通过column指定冲突目标。
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
    
  • 插入,在冲突时更新,通过约束名称指定冲突目标。
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
    
于 2018-01-30T11:22:14.320 回答
33

我正在尝试为 PostgreSQL 9.5 之前版本的单插入问题提供另一种解决方案。这个想法只是尝试首先执行插入,如果记录已经存在,则更新它:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

请注意,只有在没有删除表行的情况下才能应用此解决方案。

我不知道这个解决方案的效率,但在我看来这很合理。

于 2015-06-14T13:14:43.847 回答
11

Postgres >=9.5 的 SQLAlchemy upsert

由于上面的大篇幅涵盖了 Postgres 版本的许多不同 SQL 方法(不仅仅是问题中的非 9.5),如果您使用的是 Postgres 9.5,我想在 SQLAlchemy 中添加如何做到这一点。除了实现自己的 upsert,您还可以使用 SQLAlchemy 的函数(在 SQLAlchemy 1.1 中添加)。就个人而言,如果可能的话,我会推荐使用这些。不仅因为方便,还因为它允许 PostgreSQL 处理可能发生的任何竞争条件。

我昨天给出的另一个答案的交叉发布(https://stackoverflow.com/a/44395983/2156909

SQLAlchemyON CONFLICT现在支持两种方法on_conflict_do_update()on_conflict_do_nothing()

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

于 2017-06-07T09:33:21.343 回答
4
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

在 Postgresql 9.3 上测试

于 2017-03-02T05:25:13.007 回答
0

由于此问题已关闭,因此我在此处发布有关您如何使用 SQLAlchemy 进行操作的信息。通过递归,它重试批量插入或更新以对抗竞争条件和验证错误。

首先是进口

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

现在有几个辅助函数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

最后是 upsert 函数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

这是你如何使用它

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

这样做的优点bulk_save_objects是它可以在插入时处理关系、错误检查等(与批量操作不同)。

于 2017-04-26T11:30:37.220 回答