100

asyncio使用 Python 3.4库为代码编写单元测试的最佳方法是什么?假设我想测试一个 TCP 客户端 ( SocketConnection):

import asyncio
import unittest

class TestSocketConnection(unittest.TestCase):
    def setUp(self):
        self.mock_server = MockServer("localhost", 1337)
        self.socket_connection = SocketConnection("localhost", 1337)

    @asyncio.coroutine
    def test_sends_handshake_after_connect(self):
        yield from self.socket_connection.connect()
        self.assertTrue(self.mock_server.received_handshake())

当使用默认测试运行器运行此测试用例时,测试将始终成功,因为该方法只执行到第一yield from条指令,之后它在执行任何断言之前返回。这会导致测试始终成功。

是否有能够处理这样的异步代码的预构建测试运行器?

4

10 回答 10

121

由于 Python 3.8 unittest附带了为此目的而设计的IsolatedAsyncioTestCase函数。

from unittest import IsolatedAsyncioTestCase

class Test(IsolatedAsyncioTestCase):

    async def test_functionality(self):
        result = await functionality()
        self.assertEqual(expected, result)
于 2019-12-14T09:41:28.597 回答
58

我使用受 Tornado 的gen_test启发的装饰器暂时解决了这个问题:

def async_test(f):
    def wrapper(*args, **kwargs):
        coro = asyncio.coroutine(f)
        future = coro(*args, **kwargs)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(future)
    return wrapper

就像 JF Sebastian 建议的那样,这个装饰器将阻塞,直到测试方法协程完成。这允许我编写这样的测试用例:

class TestSocketConnection(unittest.TestCase):
    def setUp(self):
        self.mock_server = MockServer("localhost", 1337)
        self.socket_connection = SocketConnection("localhost", 1337)

    @async_test
    def test_sends_handshake_after_connect(self):
        yield from self.socket_connection.connect()
        self.assertTrue(self.mock_server.received_handshake())

这个解决方案可能会遗漏一些边缘情况。

我认为应该将这样的工具添加到 Python 的标准库中,以使开箱即用的交互更加方便asynciounittest

于 2014-04-12T22:14:14.520 回答
51

async_test,由 Marvin Killing 建议,绝对可以提供帮助 - 以及直接调用loop.run_until_complete()

但我也强烈建议为每个测试重新创建新的事件循环,并直接将循环传递给 API 调用(至少asyncio它本身接受loop每个需要它的调用的仅关键字参数)。

喜欢

class Test(unittest.TestCase):
    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def test_xxx(self):
        @asyncio.coroutine
        def go():
            reader, writer = yield from asyncio.open_connection(
                '127.0.0.1', 8888, loop=self.loop)
            yield from asyncio.sleep(0.01, loop=self.loop)
        self.loop.run_until_complete(go())

隔离测试用例中的测试并防止奇怪的错误,例如已经创建test_a但仅在test_b执行时完成的长期协程。

于 2014-05-13T21:55:32.657 回答
24

真的很像https://stackoverflow.com/a/23036785/350195async_test中提到的包装器,这里是 Python 3.5+ 的更新版本

def async_test(coro):
    def wrapper(*args, **kwargs):
        loop = asyncio.new_event_loop()
        try:
            return loop.run_until_complete(coro(*args, **kwargs))
        finally:
            loop.close()
    return wrapper



class TestSocketConnection(unittest.TestCase):
    def setUp(self):
        self.mock_server = MockServer("localhost", 1337)
        self.socket_connection = SocketConnection("localhost", 1337)

    @async_test
    async def test_sends_handshake_after_connect(self):
        await self.socket_connection.connect()
        self.assertTrue(self.mock_server.received_handshake())
于 2017-09-20T14:35:58.453 回答
18

pytest-asyncio看起来很有希望:

@pytest.mark.asyncio
async def test_some_asyncio_code():
    res = await library.do_something()
    assert b'expected result' == res
于 2016-08-31T06:36:08.323 回答
10

使用此类而不是unittest.TestCase基类:

import asyncio
import unittest


class AioTestCase(unittest.TestCase):

    # noinspection PyPep8Naming
    def __init__(self, methodName='runTest', loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self._function_cache = {}
        super(AioTestCase, self).__init__(methodName=methodName)

    def coroutine_function_decorator(self, func):
        def wrapper(*args, **kw):
            return self.loop.run_until_complete(func(*args, **kw))
        return wrapper

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if asyncio.iscoroutinefunction(attr):
            if item not in self._function_cache:
                self._function_cache[item] = self.coroutine_function_decorator(attr)
            return self._function_cache[item]
        return attr


class TestMyCase(AioTestCase):

    async def test_dispatch(self):
        self.assertEqual(1, 1)

编辑1:

请注意@Nitay关于嵌套测试的答案。

于 2016-06-17T18:28:06.620 回答
9

您也可以使用aiounittest与@Andrew Svetlov、@Marvin Killing 答案类似的方法,并将其包装在易于使用的AsyncTestCase类中:

import asyncio
import aiounittest


async def add(x, y):
    await asyncio.sleep(0.1)
    return x + y

class MyTest(aiounittest.AsyncTestCase):

    async def test_async_add(self):
        ret = await add(5, 6)
        self.assertEqual(ret, 11)

    # or 3.4 way
    @asyncio.coroutine
    def test_sleep(self):
        ret = yield from add(5, 6)
        self.assertEqual(ret, 11)

    # some regular test code
    def test_something(self):
        self.assertTrue(true)

如您所见,异步情况由AsyncTestCase. 它还支持同步测试。有可能提供自定义事件循环,只需覆盖AsyncTestCase.get_event_loop.

如果您更喜欢(出于某种原因)其他 TestCase 类(例如unittest.TestCase),您可以使用async_test装饰器:

import asyncio
import unittest
from aiounittest import async_test


async def add(x, y):
    await asyncio.sleep(0.1)
    return x + y

class MyTest(unittest.TestCase):

    @async_test
    async def test_async_add(self):
        ret = await add(5, 6)
        self.assertEqual(ret, 11)
于 2017-08-18T07:58:22.753 回答
1

pylover 的答案是正确的,应该添加到 unittest IMO 中。

我会添加一点更改以支持嵌套异步测试:

class TestCaseBase(unittest.TestCase):
    # noinspection PyPep8Naming
    def __init__(self, methodName='runTest', loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self._function_cache = {}
        super(BasicRequests, self).__init__(methodName=methodName)

    def coroutine_function_decorator(self, func):
        def wrapper(*args, **kw):
            # Is the io loop is already running? (i.e. nested async tests)
            if self.loop.is_running():
                t = func(*args, **kw)
            else:
                # Nope, we are the first
                t = self.loop.run_until_complete(func(*args, **kw))
            return t

        return wrapper

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if asyncio.iscoroutinefunction(attr):
            if item not in self._function_cache:
                self._function_cache[item] = self.coroutine_function_decorator(attr)
            return self._function_cache[item]
        return attr
于 2020-04-02T07:58:42.003 回答
1

我通常将我的异步测试定义为协程,并使用装饰器“同步”它们:

import asyncio
import unittest

def sync(coro):
    def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(coro(*args, **kwargs))
    return wrapper

class TestSocketConnection(unittest.TestCase):
    def setUp(self):
        self.mock_server = MockServer("localhost", 1337)
        self.socket_connection = SocketConnection("localhost", 1337)

    @sync
    async def test_sends_handshake_after_connect(self):
        await self.socket_connection.connect()
        self.assertTrue(self.mock_server.received_handshake())
于 2017-07-28T22:08:19.160 回答
0

除了 pylover 的回答,如果您打算使用测试类本身的其他异步方法,以下实现会更好 -

import asyncio
import unittest

class AioTestCase(unittest.TestCase):

    # noinspection PyPep8Naming
    def __init__(self, methodName='runTest', loop=None):
        self.loop = loop or asyncio.get_event_loop()
        self._function_cache = {}
        super(AioTestCase, self).__init__(methodName=methodName)

    def coroutine_function_decorator(self, func):
        def wrapper(*args, **kw):
            return self.loop.run_until_complete(func(*args, **kw))
        return wrapper

    def __getattribute__(self, item):
        attr = object.__getattribute__(self, item)
        if asyncio.iscoroutinefunction(attr) and item.startswith('test_'):
            if item not in self._function_cache:
                self._function_cache[item] = 
                    self.coroutine_function_decorator(attr)
            return self._function_cache[item]
        return attr


class TestMyCase(AioTestCase):

    async def multiplier(self, n):
        await asyncio.sleep(1)  # just to show the difference
        return n*2

    async def test_dispatch(self):
        m = await self.multiplier(2)
        self.assertEqual(m, 4)

唯一的变化是 -and item.startswith('test_')__getattribute__方法中。

于 2020-01-13T10:30:37.823 回答