0

我正在尝试在 pytest 夹具中使用 asyncio 运行服务器

@pytest.fixture(autouse=True)
@pytest.mark.asyncio
async def start_endpoints(
    endpoint1: ServerEndpoint,
    endpoint2: ServerEndpoint
):
    pool = ThreadPoolExecutor(max_workers=2)
    loop = asyncio.get_running_loop()

    await loop.run_in_executor(pool, endpoint1.start)
    await loop.run_in_executor(pool, endpoint2.start)

方法start如下

async def start(self):
        try:

            server = await asyncio.start_server(self.handle_req, self.addr, self.port)
            addr = server.sockets[0].getsockname()
            print(f'{self.name}: serving on {addr}')

            async with server:
                await server.serve_forever()

而一旦尝试打开与服务器的连接,测试就会打印此错误

self = <_WindowsSelectorEventLoop running=False closed=False debug=False>
fut = <Future finished exception=ConnectionRefusedError(10061, "Connect call failed ('127.0.0.1', 9000)")>
sock = <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
address = ('127.0.0.1', 9000)

    def _sock_connect_cb(self, fut, sock, address):
        if fut.cancelled():
            return
    
        try:
            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
            if err != 0:
                # Jump to any except clause below.
>               raise OSError(err, f'Connect call failed {address}')
E               ConnectionRefusedError: [Errno 10061] Connect call failed ('127.0.0.1', 9000)

编辑:问题是事件循环在之后立即关闭,所以我试图用标记我所有的灯具,(scope="module")但现在我得到了

ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'module' scoped request object, involved factories
test\e2e\test_peer.py:380:  def start_endpoints

编辑2:

所以我添加了event_loop夹具

@pytest.fixture(scope="module")
def event_loop():
    loop = asyncio.get_event_loop()
    yield loop
    loop.close()

这应该覆盖每个夹具的默认循环,使用@pytest.mark.asyncio.

@pytest.fixture(autouse=True, scope="module")
@pytest.mark.asyncio
async def start_endpoints(
    event_loop,
    endpoint1: ServerEndpoint,
    endpoint2: ServerEndpoint
):
    pool = ThreadPoolExecutor(max_workers=2)

    await event_loop.run_in_executor(pool, endpoint1.start)
    await event_loop.run_in_executor(pool, endpoint2.start)

通过在我的测试中进行调试,event_loop等于我存储在ServerEndpoint(即asyncio.get_running_loop())内的循环,但我仍然得到ConnectionRefusedError

4

1 回答 1

1

可能它应该像这样工作:

async def handle(reader, writer):
    data = await reader.read(100)

    message = data.decode()
    print(f"SERVER: Received {message!r}")
    writer.write(data)
    await writer.drain()
    print(f"SERVER: Sent: {message!r}")

    writer.close()
    print("SERVER: Closed the connection")

async def start():
    server = await asyncio.start_server(handle, host, port)

    addr = server.sockets[0].getsockname()
    print(f'Server is running on {addr[0:2]}')

    async with server:
        await server.serve_forever()


async def _async_wait_for_server(event_loop, host, port):
    while True:
        a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            await event_loop.sock_connect(a_socket, (host, port))
            return
        except ConnectionRefusedError:
            await asyncio.sleep(0.01)

        finally:
            a_socket.close()

@pytest.fixture()
def server(event_loop, host, port):
    cancel_handle = asyncio.ensure_future(start(host, port), loop=event_loop)
    event_loop.run_until_complete(
        asyncio.wait_for(_async_wait_for_server(event_loop, host, port), 5.0)
    )

    try:
        yield
    finally:
        cancel_handle.cancel()

但我建议您以另一种方式进行功能测试:

  • 为服务器创建 docker 镜像和用于测试的镜像
  • 创建 docker-compose.yml 文件(使用 depends_on 和 healthcheck 用于测试容器)
  • 启动服务器后运行测试
于 2021-06-19T13:13:11.380 回答