我正在使用 bulk_create 将数千行或行加载到 postgresql 数据库中。不幸的是,有些行导致 IntegrityError 并停止了 bulk_create 进程。我想知道是否有办法告诉 django 忽略这些行并尽可能多地保存批处理?
7 回答
这现在可以在 Django 2.2 上实现
Django 2.2ignore_conflicts
为该方法添加了一个新选项bulk_create
,来自文档:
在支持它的数据库上(除 PostgreSQL < 9.5 和 Oracle 之外的所有数据库),将 ignore_conflicts 参数设置为 True 会告诉数据库忽略插入任何违反约束的行(例如重复唯一值)的失败。启用此参数将禁用在每个模型实例上设置主键(如果数据库通常支持它)。
例子:
Entry.objects.bulk_create([
Entry(headline='This is a test'),
Entry(headline='This is only a test'),
], ignore_conflicts=True)
(注意:我不使用Django,所以可能有更合适的框架特定的答案)
Django 不可能通过简单地忽略INSERT
失败来做到这一点,因为 PostgreSQL 在第一个错误时中止整个事务。
Django 需要以下方法之一:
INSERT
单独事务中的每一行并忽略错误(非常慢);- 在每次插入之前创建一个
SAVEPOINT
(可能有缩放问题); - 仅当行不存在时才使用过程或查询插入(复杂且缓慢);或者
- 将数据批量插入或(更好)
COPY
到TEMPORARY
表中,然后将其合并到服务器端的主表中。
类似 upsert 的方法 (3) 似乎是个好主意,但是upsert 和 insert-if-not-exists 出奇地复杂。
就个人而言,我会采取(4):我会批量插入到一个新的单独表中,可能是UNLOGGED
or TEMPORARY
,然后我会运行一些手动 SQL 来:
LOCK TABLE realtable IN EXCLUSIVE MODE;
INSERT INTO realtable
SELECT * FROM temptable WHERE NOT EXISTS (
SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);
LOCK TABLE ... IN EXCLUSIVE MODE
防止创建行的并发插入与上述语句完成的插入发生冲突并失败。它不会阻止并发SELECT
s,只有SELECT ... FOR UPDATE
、和INSERT
,所以从表中读取正常进行。UPDATE
DELETE
如果您不能长时间阻止并发写入,则可以改为使用可写 CTE 将行范围从temptable
into复制realtable
,如果失败则重试每个块。
一种不涉及手动 SQL 和临时表的快速而肮脏的解决方法是尝试批量插入数据。如果失败,请恢复为串行插入。
objs = [(Event), (Event), (Event)...]
try:
Event.objects.bulk_create(objs)
except IntegrityError:
for obj in objs:
try:
obj.save()
except IntegrityError:
continue
如果您有很多错误,这可能不是那么有效(您将花费更多时间连续插入而不是批量插入),但我正在处理一个重复很少的高基数数据集,所以这解决了我的大部分问题问题。
或者 5. 分而治之
我没有对此进行彻底的测试或基准测试,但它对我来说表现相当不错。YMMV,具体取决于您希望在批量操作中遇到多少错误。
def psql_copy(records):
count = len(records)
if count < 1:
return True
try:
pg.copy_bin_values(records)
return True
except IntegrityError:
if count == 1:
# found culprit!
msg = "Integrity error copying record:\n%r"
logger.error(msg % records[0], exc_info=True)
return False
finally:
connection.commit()
# There was an integrity error but we had more than one record.
# Divide and conquer.
mid = count / 2
return psql_copy(records[:mid]) and psql_copy(records[mid:])
# or just return False
即使在 Django 1.11 中也没有办法做到这一点。我找到了比使用 Raw SQL 更好的选择。它使用djnago-query-builder。它有一个upsert方法
from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)]
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])
注意:该库仅支持 postgreSQL
这是我用于批量插入的要点,它支持忽略 IntegrityErrors 并返回插入的记录。
pre Django 2.2 项目的迟到答案:
我最近遇到了这种情况,我找到了用第二个列表数组检查唯一性的方法。
就我而言,该模型具有唯一的共同检查,并且由于批量创建的数组中包含重复数据,因此批量创建会引发完整性错误异常。
所以我决定在批量创建对象列表之外创建清单。这是示例代码;唯一键是owner和brand,在此示例中 owner 是一个用户对象实例,而 brand 是一个字符串实例:
create_list = []
create_list_check = []
for brand in brands:
if (owner.id, brand) not in create_list_check:
create_list_check.append((owner.id, brand))
create_list.append(ProductBrand(owner=owner, name=brand))
if create_list:
ProductBrand.objects.bulk_create(create_list)
这对我有用,我
在线程中使用这个函数。
我的 csv 文件包含 120907 行。
def products_create():
full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
filename = os.listdir(full_path)[0]
logger.debug(filename)
logger.debug(len(Product.objects.all()))
if len(Product.objects.all()) > 0:
logger.debug("Products Data Erasing")
Product.objects.all().delete()
logger.debug("Products Erasing Done")
csvfile = os.path.join(full_path,filename)
csv_df = pd.read_csv(csvfile,sep=',')
csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
row_iter = csv_df.iterrows()
logger.debug(row_iter)
logger.debug("New Products Creating")
for index, row in row_iter:
Product.objects.create(part_number = row[0],
part_description = row[1],
mrp = row[2],
hsn_code = row[3],
gst = row[4],
)
# products_list = [
# Product(
# part_number = row[0] ,
# part_description = row[1],
# mrp = row[2],
# hsn_code = row[3],
# gst = row[4],
# )
# for index, row in row_iter
# ]
# logger.debug(products_list)
# Product.objects.bulk_create(products_list)
logger.debug("Products uploading done")```