0

在试图了解 django 的 select_for_update 时,我创建了一个测试来查看。据我了解,如果有两个进程,一个在事务内部选择更新,另一个在事务结束之前尝试在其他进程中写入锁定的行,写入将失败。但是,以下测试通过了。有什么问题?

@pytest.mark.django_db(transaction=True)
def test_select_for_update():

    ExampleModel.objects.create(type="test1")
    ExampleModel.objects.create(type="test2")

    import os
    child_pid = os.fork()
    if child_pid == 0:
        sleep(5)
        qs = ExampleModel.objects.all()
        ExampleModel.objects.bulk_update(qs, ["type"])
    else:
        with transaction.atomic():
            qs = ExampleModel.objects.all().select_for_update()
            for obj in qs:
                obj.type = "test3"

            ExampleModel.objects.bulk_update(qs, ["type"])
            sleep(10)
4

0 回答 0