11

我正在尝试在 FastAPI 中连接到 mongodb。我反复收到此异常。

文件 - main.py

app = FastAPI(
    title=config.PROJECT_NAME, docs_url="/api/docs", openapi_url="/api"
)

@app.get("/api/testing")
async def testit():
    user_collection = readernetwork_db.get_collection("user_collection")
    all_users = await user_collection.find_one({"email": "sample_email"})
    print("all users --- ", all_users)
    return all_users

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", reload=True, port=8888)

文件 - session.py

import motor.motor_asyncio
from app.core import config


print("here we go again....")
client = motor.motor_asyncio.AsyncIOMotorClient(
    config.MONGOATLAS_DATABASE_URI)
readernetwork_db = client.get_database("readernetwork")

例外 -:

all_users = await user_collection.find_one({"email": "sample_email"})

RuntimeError: Task <Task pending name='Task-4' coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py:389> cb=[set.discard()]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360]> attached to a different loop

我不知道我在哪里弄错了。我应该为电机指定一个事件循环吗?

4

2 回答 2

9

您可以mongodb motor在全局范围内拥有客户端,但创建和关闭它应该在异步函数中完成。startup在应用程序的shutdown处理程序中执行此操作的最优选方式。像这样:

# mongodb.py
from motor.motor_asyncio import AsyncIOMotorClient


db_client: AsyncIOMotorClient = None


async def get_db_client() -> AsyncIOMotorClient:
    """Return database client instance."""
    return db_client


async def connect_db():
    """Create database connection."""
    global db_client
    db_client = AsyncIOMotorClient(DB_URL)

async def close_db():
    """Close database connection."""
    db_client.close()
# main.py
app = FastAPI(title=PROJECT_NAME)
...
app.add_event_handler("startup", connect_db)
app.add_event_handler("shutdown", close_db)

请注意,您需要该行global db_client来修改预先定义的全局变量。

于 2021-01-03T09:35:40.797 回答
0

无需使用全局...您可以从给任何路由的请求中访问应用程序状态。

#main.py
async def open_db() -> AsyncIOMotorClient:
    app.state.mongodb = AsyncIOMotorClient(DB_URL)

async def close_db():
    app.state.mongodb.close()

app.add_event_handler('startup', open_db)
app.add_event_handler('shutdown', close_db)

在对给定路由的每个请求中,您都可以访问应用程序状态。例如,

@app.route('/{username}')
async def index(request: Request, username: str):
    user = await request.app.state.mongodb['auth']['users'].find_one({"username" : username})

您甚至可以通过在 open_db 函数中执行类似的操作来使其更容易。将诸如“用户”之类的状态值指定为特定的集合实例。

async def open_db() -> AsyncIOMotorClient:
    app.state.mongodb = AsyncIOMotorClient(DB_URL)
    app.state.users = app.state.mongodb['auth']['users']

现在你可以这样做,

@app.route('/{username}')
async def index(request: Request, username: str):
    user = await request.app.state.users.find_one({"username" : username})
于 2022-02-16T17:21:04.917 回答