42

我的应用程序使用范围会话和 SQLALchemy 的声明式样式。它是一个网络应用程序,许多数据库插入由Celery任务调度程序执行。

通常,在决定插入对象时,我的代码可能会执行以下操作:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

这里的问题是,因为其中很多工作是由异步工作人员完成的,所以一个工作可能在插入 a Bikewith时工作到一半id=123,而另一个工作正在检查它的存在。在这种情况下,第二个工作人员将尝试插入具有相同主键的行,并且 SQLAlchemy 将引发IntegrityError.

除了换成以下产品之外,我一生都找不到解决此问题的好方法Session.commit()

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

然后在我所拥有的任何地方,Session.commit我现在都拥有schema.commit(ignore=True)我不介意不再插入该行的地方。

对我来说,由于字符串检查,这似乎很脆弱。仅供参考,当 anIntegrityError被提出时,它看起来像这样:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

所以当然是我插入的主键是Duplicate entry is a cool thing我想我可能会错过IntegrityError的东西,这实际上并不是因为重复的主键。

有没有更好的方法来保持我正在使用的干净的 SQLAlchemy 方法(而不是开始在字符串中写出语句等......)

Db 是 MySQL(尽管对于单元测试,我喜欢使用 SQLite,并且不想用任何新方法阻碍这种能力)。

干杯!

4

6 回答 6

38

如果您使用session.merge(bike)而不是session.add(bike),那么您将不会产生主键错误。将bike根据需要检索和更新或创建。

于 2012-07-23T21:22:35.863 回答
10

您应该IntegrityError以相同的方式处理:回滚事务,并可选择重试。有些数据库甚至不会让你在IntegrityError. 您还可以在两个冲突事务开始时获取表上的锁,或者如果数据库允许,则获取更细粒度的锁。

使用该with语句显式开始事务,并自动提交(或在任何异常时回滚):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)
于 2012-04-25T20:13:51.903 回答
4

而不是session.add(obj)你需要使用下面提到的代码,这会更干净,你不需要像你提到的那样使用自定义提交功能。但是,这将忽略冲突,不仅对于重复键,对于其他键也是如此。

mysql:

 self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))

sqlite

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))
于 2015-09-19T18:54:12.970 回答
3

我假设您在这里的主键在某种程度上是自然的,这就是为什么您不能依赖正常的自动增量技术。因此,假设问题实际上是您需要插入的一些独特列之一,这更常见。

如果你想要“尝试插入,失败时部分回滚”,你可以使用一个 SAVEPOINT,它在 SQLAlchemy 中是 begin_nested()。下一个 rollback() 或 commit() 只作用于那个 SAVEPOINT,而不是更大范围的事情发生。

但是,总的来说,这里的模式只是应该真正避免的模式。你真正想在这里做的是三件事之一。1. 不要运行处理需要插入的相同键的并发作业。2. 以某种方式在正在使用的并发键上同步作业和 3. 使用一些公共服务来生成这种特定类型的新记录,由作业共享(或确保它们在作业运行之前都已设置好)。

如果您考虑一下,#2无论如何都会以高度孤立的方式发生。启动两个 postgres 会话。第 1 节:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

会议 2:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

您将看到会话 2 阻塞,因为 PK #1 的行被锁定。我不确定 MySQL 是否足够聪明,可以做到这一点,但这是正确的行为。如果 OTOH 您尝试插入不同的 PK:

^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

它在没有阻塞的情况下进行得很好。

关键是如果你正在做这种 PK/UQ 争用,你的 celery 任务无论如何都会序列化,或者至少应该序列化。

于 2012-05-04T01:52:46.417 回答
0

使用下面的代码,你应该可以为所欲为,而不仅仅是为了解决这个问题。

class SessionWrapper(Session):
    def commit(self, ignore=True):
        try:
            super(SessionWrapper, self).commit()
        except IntegrityError as e:
            if not ignore:
                raise e
            message = e.args[0]
            if "Duplicate entry" in message:
                logging.info("Error while executing %s.\n%s.", e.statement, message)
        finally:
            super(SessionWrapper, self).close()


def session(self, auto_commit=False):
    session_factory = sessionmaker(class_=SessionWrapper, bind=self.engine, autocommit=auto_commit)
    return scoped_session(session_factory)

Session = session()
s1 = Session()

p = Test(test="xxx", id=1)
s1.add(p)
s1.commit()
s1.close()
于 2021-08-05T17:53:52.027 回答
0

只需回滚并一一重试,就这么简单:

try:
    self._session.bulk_insert_mappings(mapper, items)
    self._session.commit()
except IntegrityError:
    self._session.rollback()
    logger.info("bulk inserting rows failed, fallback to insert one-by-one")
    for item in items:
        try:
            self._session.execute(insert(mapper).values(**item))
            self._session.commit()
        except SQLAlchemyError as e:
            logger.error("Error inserting item: %s for %s", item, e)
于 2021-08-08T16:10:11.280 回答