2

我正在批量插入 mongodb 数据库。我知道插入的 99% 的记录会因为重复键错误而失败。我想在插入后打印有多少新记录插入到数据库中。所有这些都是通过 tornado motor mongodb 驱动程序在 python 中完成的,但这可能并不重要。

try:
    bulk_write_result = yield db.collections.probe.insert(dataarray, continue_on_error=True)
    nr_inserts = bulk_write_result["nInserted"]
except pymongo.errors.DuplicateKeyError as e:
    nr_inserts = ????  <--- what should I put here?

由于抛出异常,bulk_write_result为空。显然,我可以(除了并发问题)在插入之前和之后对完整集合进行计数,但我不喜欢仅在日志文件中的一行额外往返数据库。那么有什么方法可以发现实际插入了多少条记录?

4

2 回答 2

3

我不清楚为什么你yield的插入结果。但是,关于批量插入

  • 您应该insert_manyinsert已弃用的方式使用;
  • ordered关键字设置为 时False,如果出现错误,您的插入将继续;
  • 如果出现错误,insert_many将引发一个BulkWriteError,您可以查询以获取插入文档的数量。

所有这些都导致了这样的事情:

try:
  insert_many_result = db.collections.probe.insert_many(dataaray,ordered=False)
  nr_inserts = len(insert_many_result.inserted_ids)
except pymongo.errors.BulkWriteError as bwe:
  nr_inserts = bwe.details["nInserted"]

如果您需要确定写入错误背后的原因,则必须检查bwe.details['writeErrors']数组。代码值 11000 表示“重复键错误”:

>>> pprint(e.details['writeErrors'])
[{'code': 11000,
  'errmsg': 'E11000 duplicate key error index: test.w.$k_1 dup key: { : 1 }',
  'index': 0,
  'op': {'_id': ObjectId('555465cacf96c51208587eac'), 'k': 1}},
 {'code': 11000,
  'errmsg': 'E11000 duplicate key error index: test.w.$k_1 dup key: { : 3 }',
  'index': 1,
  'op': {'_id': ObjectId('555465cacf96c51208587ead'), 'k': 3}}

在这里,如您所见,我尝试在db的w集合中插入两个文档。test由于重复键错误,两次插入都失败了。

于 2015-05-14T09:00:11.923 回答
2

带有 continue_on_error 的常规插入无法报告您想要的信息。但是,如果您使用的是 MongoDB 2.6 或更高版本,我们有一个具有良好错误报告的高性能解决方案。这是一个使用 Motor 的 BulkOperationBuilder 的完整示例:

import pymongo.errors
from tornado import gen
from tornado.ioloop import IOLoop
from motor import MotorClient

db = MotorClient()
dataarray = [{'_id': 0},
             {'_id': 0},  # Duplicate.
             {'_id': 1}]


@gen.coroutine
def my_insert():
    try:
        bulk = db.collections.probe.initialize_unordered_bulk_op()

        # Prepare the operation on the client.
        for doc in dataarray:
            bulk.insert(doc)

        # Send to the server all at once.
        bulk_write_result = yield bulk.execute()
        nr_inserts = bulk_write_result["nInserted"]
    except pymongo.errors.BulkWriteError as e:
        print(e)
        nr_inserts = e.details['nInserted']

    print('nr_inserts: %d' % nr_inserts)


IOLoop.instance().run_sync(my_insert)

完整文档: http: //motor.readthedocs.org/en/stable/examples/bulk.html

注意关于 MongoDB 2.6 之前的批量插入性能差的警告!它仍然可以工作,但每个文档需要单独的往返。在 2.6+ 中,驱动程序将整个操作一次往返发送到服务器,服务器会报告成功的次数和失败的次数。

于 2015-05-14T14:45:54.177 回答