7

嗨,我正在使用 AsyncIOMotorClient 对 mongoDb 进行异步数据库调用。下面是我的代码。

xyz.py
async def insertMany(self,collection_name,documents_to_insert):
    try:
        collection=self.database[collection_name]
        document_inserted = await collection.insert_many(documents_to_insert)
        return document_inserted
    except Exception:
        raise

def insertManyFn(self,collection_name,documents_to_insert):
    try:
        loop=asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop1=asyncio.get_event_loop()
        inserted_documents_count = loop1.run_until_complete(self.insertMany(collection_name, documents_to_insert))
        if inserted_documents_count==len(documents_to_insert):
            document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
            loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
    except Exception:
        raise

xyz1.py
t=Timer(10,xyz.insertManyFn,\
                (collection_name,documents_to_insert))
t.start()   

运行此程序时,我遇到了异常

RuntimeError: Task <Task pending coro=<xyz.insertMany() running at <my workspace location>/xyz.py:144> cb=[_run_until_complete_cb() at /usr/lib64/python3.5/asyncio/base_events.py:164]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib64/python3.5/asyncio/futures.py:431]> attached to a different loop

在上面的程序中 insertManyFn 将在 10 秒后被调用并执行插入操作。但是当它第一次调用 insertMany 时,我遇到了一个异常。

4

3 回答 3

9

我仍然希望我的 MotorClient 位于模块的顶层,所以这就是我所做的:我修补MotorClient.get_io_loop以始终返回当前循环。

import asyncio
import motor.core

from motor.motor_asyncio import (
    AsyncIOMotorClient as MotorClient,
)

# MongoDB client
client = MotorClient('mongodb://localhost:27017/test')
client.get_io_loop = asyncio.get_running_loop

# The current database ("test")
db = client.get_default_database()


# async context
async def main():
    posts = db.posts
    await posts.insert_one({'title': 'great success!')


# Run main()
asyncio.run(main())
于 2021-09-05T16:34:23.963 回答
6

根据文档AsyncIOMotorClient如果您不使用默认值,则应该传递一个 ioloop。创建事件循环后尝试创建客户端:

loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = AsyncIOMotorClient(io_loop=loop)
于 2017-01-11T08:25:35.197 回答
0

我已经修改了代码并且它正在工作。

def insertManyFn(self,loop,collection_name,documents_to_insert):
    try:
        inserted_documents_count = loop.run_until_complete(self.insertMany(event_loop,collection_name, documents_to_insert))
        if len(inserted_documents_count)==len(documents_to_insert):
            document_to_insert={Config.DB_JOB_COLUMN:Job.job_id,Config.DB_JOB_RESULT_COLUMN:Config.DB_JOB_RESULT_SUCCESS}
            loop1.run_until_complete(self.insertOne(Config.DB_JOB_COLLECTION, document_to_insert))
    except Exception:
        raise

loop=asyncio.get_event_loop()   
t=Timer(10,self.xyz.insertManyFn,(loop,collection_name,documents_to_insert))
t.start()

说明 - 我正在使用 python 线程计时器,它创建自己的线程以在一定时间后执行函数。所以,在这个线程中,我得到了不应该是正确方法的事件循环,它应该首先得到事件循环并在其中创建一个计时器线程。我想这是唯一的原因。

于 2017-01-11T11:00:36.723 回答