在试图了解 django 的 select_for_update 时,我创建了一个测试来查看。据我了解,如果有两个进程,一个在事务内部选择更新,另一个在事务结束之前尝试在其他进程中写入锁定的行,写入将失败。但是,以下测试通过了。有什么问题?
@pytest.mark.django_db(transaction=True)
def test_select_for_update():
ExampleModel.objects.create(type="test1")
ExampleModel.objects.create(type="test2")
import os
child_pid = os.fork()
if child_pid == 0:
sleep(5)
qs = ExampleModel.objects.all()
ExampleModel.objects.bulk_update(qs, ["type"])
else:
with transaction.atomic():
qs = ExampleModel.objects.all().select_for_update()
for obj in qs:
obj.type = "test3"
ExampleModel.objects.bulk_update(qs, ["type"])
sleep(10)