1

I am not able to run this test, i always have the same error RuntimeError: Event loop is closed

What i need to add to this code?

from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio

client = AsyncIOMotorClient("mongodb://mongo:mongo@192.168.0.11:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']

async def create_user_db(a: dict):
    x = await aux.insert_one(a)
    return x

@pytest.mark.asyncio
async def test_create():
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(form)
    assert res != None

This is the error

enter image description here

4

1 回答 1

0

在您的示例中,您在“导入”时间打开数据库,但我们仍然没有事件循环。事件循环在测试用例运行时创建。

您可以将数据库定义为夹具并将其提供给测试功能,例如:

@pytest.fixture
def client():
    return AsyncIOMotorClient("mongodb://localhost:27017/")


@pytest.fixture
def db(client):
    return client['test']


@pytest.fixture
def collection(db):
    return db['test']


async def create_user_db(collection, a: dict):
    x = await collection.insert_one(a)
    return x



@pytest.mark.asyncio
async def test_create(collection):
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(collection, form)
    assert res != None
于 2022-02-18T13:01:19.817 回答