58

我正在使用 bulk_create 将数千行或行加载到 postgresql 数据库中。不幸的是,有些行导致 IntegrityError 并停止了 bulk_create 进程。我想知道是否有办法告诉 django 忽略这些行并尽可能多地保存批处理?

4

7 回答 7

81

这现在可以在 Django 2.2 上实现

Django 2.2ignore_conflicts为该方法添加了一个新选项bulk_create,来自文档

在支持它的数据库上(除 PostgreSQL < 9.5 和 Oracle 之外的所有数据库),将 ignore_conflicts 参数设置为 True 会告诉数据库忽略插入任何违反约束的行(例如重复唯一值)的失败。启用此参数将禁用在每个模型实例上设置主键(如果数据库通常支持它)。

例子:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)
于 2019-01-14T21:33:10.913 回答
9

(注意:我不使用Django,所以可能有更合适的框架特定的答案)

Django 不可能通过简单地忽略INSERT失败来做到这一点,因为 PostgreSQL 在第一个错误时中止整个事务。

Django 需要以下方法之一:

  1. INSERT单独事务中的每一行并忽略错误(非常慢);
  2. 在每次插入之前创建一个SAVEPOINT(可能有缩放问题);
  3. 仅当行不存在时才使用过程或查询插入(复杂且缓慢);或者
  4. 将数据批量插入或(更好)COPYTEMPORARY表中,然后将其合并到服务器端的主表中。

类似 upsert 的方法 (3) 似乎是个好主意,但是upsert 和 insert-if-not-exists 出奇地复杂

就个人而言,我会采取(4):我会批量插入到一个新的单独表中,可能是UNLOGGEDor TEMPORARY,然后我会运行一些手动 SQL 来:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

LOCK TABLE ... IN EXCLUSIVE MODE防止创建行的并发插入与上述语句完成的插入发生冲突并失败。它不会阻止并发SELECTs,只有SELECT ... FOR UPDATE、和INSERT,所以从表中读取正常进行。UPDATEDELETE

如果您不能长时间阻止并发写入,则可以改为使用可写 CTE 将行范围从temptableinto复制realtable,如果失败则重试每个块。

于 2012-09-17T02:03:29.827 回答
9

一种不涉及手动 SQL 和临时表的快速而肮脏的解决方法是尝试批量插入数据。如果失败,请恢复为串行插入。

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

如果您有很多错误,这可能不是那么有效(您将花费更多时间连续插入而不是批量插入),但我正在处理一个重复很少的高基数数据集,所以这解决了我的大部分问题问题。

于 2017-06-02T16:46:42.967 回答
2

或者 5. 分而治之

我没有对此进行彻底的测试或基准测试,但它对我来说表现相当不错。YMMV,具体取决于您希望在批量操作中遇到多少错误。

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()

    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False
于 2013-05-02T17:42:59.953 回答
0

即使在 Django 1.11 中也没有办法做到这一点。我找到了比使用 Raw SQL 更好的选择。它使用djnago-query-builder。它有一个upsert方法

from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)] 
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])

注意:该库仅支持 postgreSQL

这是我用于批量插入的要点,它支持忽略 IntegrityErrors 并返回插入的记录。

于 2017-11-10T09:13:15.273 回答
0

pre Django 2.2 项目的迟到答案:

我最近遇到了这种情况,我找到了用第二个列表数组检查唯一性的方法。

就我而言,该模型具有唯一的共同检查,并且由于批量创建的数组中包含重复数据,因此批量创建会引发完整性错误异常。

所以我决定在批量创建对象列表之外创建清单。这是示例代码;唯一键是ownerbrand,在此示例中 owner 是一个用户对象实例,而 brand 是一个字符串实例:

create_list = []
create_list_check = []
for brand in brands:
    if (owner.id, brand) not in create_list_check:
        create_list_check.append((owner.id, brand))
        create_list.append(ProductBrand(owner=owner, name=brand))

if create_list:
    ProductBrand.objects.bulk_create(create_list)
于 2019-10-31T08:30:23.633 回答
0

这对我有用,我
在线程中使用这个函数。
我的 csv 文件包含 120907 行。

def products_create():
    full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
    filename = os.listdir(full_path)[0]
    logger.debug(filename)
    logger.debug(len(Product.objects.all()))
    if len(Product.objects.all()) > 0:
        logger.debug("Products Data Erasing")
        Product.objects.all().delete()
        logger.debug("Products Erasing Done")

    csvfile = os.path.join(full_path,filename)
    csv_df = pd.read_csv(csvfile,sep=',')
    csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
    row_iter = csv_df.iterrows()
    logger.debug(row_iter)

    logger.debug("New Products Creating")

    for index, row in row_iter:
        Product.objects.create(part_number = row[0],
                               part_description = row[1],
                               mrp = row[2],
                               hsn_code = row[3],
                               gst = row[4],
                               )

    # products_list = [
    #           Product(
    #               part_number = row[0] ,
    #               part_description = row[1],
    #               mrp = row[2],
    #               hsn_code = row[3],
    #               gst = row[4],
    #           )
    #           for index, row in row_iter

    #       ]
    # logger.debug(products_list)

    # Product.objects.bulk_create(products_list)
    logger.debug("Products uploading done")```

于 2022-01-13T13:20:23.537 回答