8

我正在尝试使用 pytest-xdist 以使我的测试并行运行,问题是每个线程都将转到与所有测试共享的夹具并根据线程数执行它。

它给我带来了一个问题,因为该夹具角色是为我的测试创建数据,一旦它已经创建,我就会得到错误,因为它已经创建(通过 REST)。

conftest.py:

lock = threading.Lock()

@pytest.fixture(scope=session)
def init_data(request):

    lock.acquire()
    try:
        data = []
        if checking_data_not_created():
            data.append(some_rest_command_creating_data_1())
            data.append(some_rest_command_creating_data_2())
    finally:
        lock.release()

    yield data

    lock.acquire()
    try:
        remove_all_data()
    finally:
        lock.release()

测试类.py:

class classTests():

    def first_test(init_data):
        test body and validation....

     def second_test(init_data):
        test body and validation....

我正在使用命令:pytest -v -n2

假设第一个线程应该运行 first_test() 并且第二个线程应该运行 second_test() 其中一个总是会失败,因为第一个线程已经在夹具部分创建了数据,另一个线程将出现异常并且他应该运行的所有测试都将失败。

如您所见,我尝试使用 lock 来同步线程,但它也不起作用。

知道如何克服这个问题吗?

谢谢。

4

3 回答 3

4

这种方法不适用于 pytest-xdist,因为它使用多处理而不是多线程,但是它可以使用 --tests-per-worker 选项与pytest-parallel一起使用,它将使用多个线程运行测试。

在使用以下夹具的多线程 pytest 执行中,数据只会设置一次并清理一次:

conftest.py:

lock = threading.Lock()
threaded_count = 0

@pytest.fixture(scope='session')
def init_data():
    global lock
    global threaded_count

    lock.acquire()
    threaded_count += 1
    try:
        if threaded_count == 1:
            # Setup Data Once
            data = setup_data()
    finally:
        lock.release()

    yield data

    lock.acquire()
    threaded_count -= 1
    try:
        if threaded_count == 0:
            # Cleanup Data Once
            data = cleaup_data()
    finally:
        lock.release()

命令:

pytest -v --tests-per-worker 2

于 2019-05-23T11:36:54.090 回答
2

请注意, pytest-xdist 不支持'session'范围固定装置。pytest-xdist 中的'session'作用域固定装置实际上意味着特定于进程的会话级固定装置,因此它会为每个进程分别创建和拆除,并且固定装置的状态不会在进程之间共享。有一些长期存在的建议来为 pytest-xdist 添加真正的会话范围固定装置的锁定和共享,但它们都遇到了许多经典的理由,要不惜一切代价尝试避免多线程或多处理锁和同步,因此它被 pytest-xdist 开发人员严重降低了优先级(这是可以理解的)。

于 2019-07-26T14:52:32.640 回答
1

Python xdist有办法做到这一点

我相信您的示例,应用他们的建议如下所示:

def create_data():
    data = []
    data.append(some_rest_command_creating_data_1())
    data.append(some_rest_command_creating_data_2())

    return data


@pytest.fixture(scope="session")
def session_data(request, tmp_path_factory, worker_id):
    if worker_id == "master":
        # Not running multiple processes, just create the data.
        data = create_data()
    else:
        # Running multiple processes, manage lockfile.
        root_tmp_dir = tmp_path_factory.getbasetemp().parent

        fn = root_tmp_dir / "data.json"
        with FileLock(str(fn) + ".lock"):
            if fn.is_file():
                # Data has been created, read it in.
                data = json.loads(fn.read_text())
            else:
                # Data not created, create and write it out.
                data = create_data()
                fn.write_text(json.dumps(data))

    yield data

    remove_all_data()

与您的示例不同,此示例data通过保存在共享临时测试目录中tmp_path_factory.getbasetemp().parent锁定文件提供 pytest

于 2021-03-16T15:43:05.520 回答