46

如何在 pymongo 中进行批量更新?我想更新一堆条目,一次做一个非常慢。

几乎相同的问题的答案在这里:在 MongoDB 中批量更新/更新?

接受的答案实际上并没有回答问题。它只是提供了一个到 mongo CLI 的链接以进行导入/导出。

我也愿意向某人解释为什么不可能进行批量 upsert/不是最佳实践,但请解释此类问题的首选解决方案是什么。

4

6 回答 6

47

pymongo 的现代版本(大于 3.x)将批量操作包装在一致的接口中,该接口会在服务器版本不支持批量操作的情况下降级。这现在在 MongoDB 官方支持的驱动程序中是一致的。

因此,编码的首选方法是使用bulk_write(),而不是使用UpdateOne其他适当的操作操作。现在当然首选使用自然语言列表而不是特定的构建器

旧文档的直接翻译:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

或者经典的文档转换循环:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

返回的结果BulkWriteResult将包含匹配和更新文档的计数器以及_id发生的任何“更新插入”的返回值。

关于批量操作数组的大小存在一些误解。发送到服务器的实际请求不能超过 16MB BSON 限制,因为该限制也适用于发送到使用 BSON 格式的服务器的“请求”。

但是,这并不能控制您可以构建的请求数组的大小,因为实际操作无论如何只会以 1000 个批次发送和处理。唯一真正的限制是这 1000 条操作指令本身实际上并不会创建大于 16MB 的 BSON 文档。这确实是一个很高的要求。

批量方法的一般概念是“更少的流量”,因为一次发送许多东西并且只处理一个服务器响应。减少附加到每个更新请求的开销可以节省大量时间。

于 2016-03-25T03:51:51.070 回答
32

MongoDB 2.6+ has support for bulk operations. This includes bulk inserts, upserts, updates, etc. The point of this is to reduce/eliminate delays from the round-trip latency of doing record-by-record operations ('document by document' to be correct).

So, how does this work? Example in Python, because that's what I'm working in.

>>> import pymongo
>>> pymongo.version
'2.7rc0'

To use this feature, we create a 'bulk' object, add documents to it, then call execute on it and it will send all the updates at once. Caveats: The BSONsize of the collected operations (sum of the bsonsizes) cannot be over the document size limit of 16 MB. Of course, the number of operations can thus vary significantly, Your Mileage May Vary.

Example in Pymongo of Bulk upsert operation:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

This is the essential method. More info available at:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Edit :- since version 3.5 of python driver, initialize_ordered_bulk_op is deprecated. Use bulk_write() instead. [ http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write ]

于 2014-03-25T00:11:09.290 回答
7

如果你有很多数据,你想用“_id”来判断数据是否存在,

你可以试试...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)

我的英语很差,如果你听不懂我说的话,对不起

于 2018-01-25T02:18:06.460 回答
1

您可以使用 multi=True 更新与查询规范匹配的所有文档。

这里有一个关于按照你想要的方式执行一批命令的错误。

于 2011-03-14T18:15:35.193 回答
1

答案保持不变:不支持批量 upsert。

于 2011-03-13T21:45:31.227 回答
0

使用 Python 3.5+、电机和异步的最快批量更新:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
于 2016-12-07T17:41:56.563 回答