我有一个简单的回声服务器功能。如果我用一个测试启动 pytest,一切正常,但如果我用两个测试启动它,那么第二个会挂起等待服务器启动,我不明白为什么。这是一个完整的代码。
服务器:
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"SERVER: Received {message!r} from {addr!r}")
writer.write(data)
await writer.drain()
print(f"SERVER: Sent: {message!r}")
writer.close()
print("SERVER: Closed the connection")
测试设置:
HOST = "localhost"
@pytest.fixture()
def event_loop():
return asyncio.get_event_loop()
async def _async_wait_for_server(event_loop, addr, port):
while True:
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
await event_loop.sock_connect(a_socket, (addr, port))
return
except ConnectionRefusedError:
await asyncio.sleep(0.001)
finally:
a_socket.close()
@pytest.fixture()
def server(event_loop):
unused_tcp_port = 65432
cancel_handle = asyncio.ensure_future(main(unused_tcp_port), loop=event_loop)
event_loop.run_until_complete(asyncio.wait_for(
_async_wait_for_server(event_loop, HOST, unused_tcp_port), 5.0))
try:
yield unused_tcp_port
finally:
cancel_handle.cancel()
async def main(port):
server = await asyncio.start_server(handle_echo, HOST, port)
addr = server.sockets[0].getsockname()
print(f'SERVER: Serving on {addr[0:2]}')
async with server:
await server.serve_forever()
测试:
@pytest.mark.asyncio
async def test_something(server):
message = "Foobar!"
reader, writer = await asyncio.open_connection(HOST, server)
print(f'CLIENT: Sent {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'CLIENT: Received {data.decode()!r}')
print('CLIENT: Close the connection')
writer.close()
await writer.wait_closed()
@pytest.mark.asyncio
async def test_another_thing(server):
# Literally the same
Pytest输出:================================================ =================== 测试会话开始 ============================ ======================================== 平台 win32 -- Python 3.7.0, pytest- 6.2.5、py-1.10.0、pluggy-1.0.0 -- C:\Users\hurrd.virtualenvs\chatServer-iuEQ-ghN\Scripts\python.exe 缓存目录:.pytest_cache 根目录:C:\Users\hurrd\ PycharmProjects\chatServer 插件:asyncio-0.16.0 收集到 2 项
测试/单元/test_themes.py::test_something 已通过 [50%]
测试/单元/test_themes.py::test_another_thing