10

我将几条 10k 记录插入到具有 REF 完整性规则的数据库中。不幸的是,一些数据行是重复的(因为它们已经存在于数据库中)。在插入数据库之前检查数据库中每一行的存在成本太高了,所以我打算继续处理 SQLAlchemy 抛出的 IntegrityError 异常,记录错误然后继续。

我的代码将如下所示:

# establish connection to db etc.

tbl = obtain_binding_to_sqlalchemy_orm()
datarows = load_rows_to_import()

try:
    conn.execute(tbl.insert(), datarows)
except IntegrityError as ie:
    # eat error and keep going
except Exception as e:
    # do something else

我在上面所做的(隐式)假设是 SQLAlchemy 没有将多个插入滚动到一个事务中。如果我的假设是错误的,那么这意味着如果发生 IntegrityError,则插入的其余部分将被中止。任何人都可以确认上面的伪代码“模式”是否会按预期工作 - 或者我是否会由于抛出 IntegrityError 异常而最终丢失数据?

另外,如果有人对此有更好的想法,我会很感兴趣。

4

3 回答 3

2

如果您之前没有启动任何事务,它可能会像这样工作,因为在这种情况下 sqlalchemy 的自动提交功能将启动。但您应该按照链接中的描述显式设置。

于 2012-05-14T15:46:38.063 回答
0

我在解析 ASCII 数据文件以将数据导入表时也遇到了这个问题。问题是我本能地和直觉地希望 SQLAlchemy 在允许唯一数据的同时跳过重复的行。或者,由于当前的 SQL 引擎,例如不允许使用 unicode 字符串,可能会引发随机错误。

但是,这种行为超出了 SQL 接口的定义范围。SQL API,因此 SQLAlchemy 只理解事务和提交,而不考虑这种选择性行为。此外,依赖自动提交功能听起来很危险,因为插入会在异常发生后停止,留下其余数据。

我的解决方案(我不确定它是否是最优雅的)是处理循环中的每一行,捕获并记录异常,并在最后提交更改。

假设您以某种方式获取列表列表中的数据,即作为列值列表的行列表。然后你在一个循环中读取每一行:

# Python 3.5
from sqlalchemy import Table, create_engine
import logging

# Create the engine
# Create the table
# Parse the data file and save data in `rows`

conn = engine.connect() 
trans = conn.begin() # Disables autocommit

exceptions = {}
totalRows = 0
importedRows = 0

ins = table.insert()

for currentRowIdx, cols in enumerate(rows):
    try:
        conn.execute(ins.values(cols)) # try to insert the column values
        importedRows += 1

    except Exception as e:
        exc_name = type(e).__name__ # save the exception name
        if not exc_name in exceptions:
            exceptions[exc_name] = []
        exceptions[exc_name].append(currentRowIdx)

    totalRows += 1

for key, val in exceptions.items():
    logging.warning("%d out of %d lines were not imported due to %s."%(len(val), totalRows, key))

logging.info("%d rows were imported."%(importedRows))

trans.commit() # Commit at the very end
conn.close()

为了最大限度地提高此操作的速度,您应该禁用自动提交。我将此代码与 SQLite 一起使用,它仍然比我使用 only 的旧版本慢 3-5 倍sqlite3,即使禁用了自动提交。(我移植到 SQLAlchemy 的原因是能够将它与 MySQL 一起使用。)

它不是最优雅的解决方案,因为它不如 SQLite 的直接接口快。如果我分析代码并在不久的将来找到瓶颈,我将用解决方案更新这个答案。

于 2016-01-21T14:44:35.313 回答
0

几乎没有办法告诉 sql 引擎做一个bulk insert on duplicate ignore动作。但是,我们可以尝试在 python 端做一个后备解决方案。如果您的副本没有以非常糟糕的方式分发*,那么这几乎可以同时获得两全其美的好处。

try:
    # by very bad, I mean what if each batch of the items contains one duplicate
    session.bulk_insert_mappings(mapper, items)
    session.commit()
except IntegrityError:
    logger.info("bulk inserting rows failed, fallback to one by one")
    for item in items:
        try:
            session.execute(insert(mapper).values(**item))
            session.commit()
        except SQLAlchemyError:
            logger.exception("Error inserting item: %s", item)
于 2021-08-08T03:09:51.113 回答