0

我正在开发一个系统,该系统在为许多工作机器提供工作时必须处理许多竞争条件。

客户端将向系统查询 status='0' (ToDo) 的作业,然后以原子方式更新 status='1' (Locked) 的 'oldest' 行并检索该行的 id(用于更新带有工人信息的工作,例如正在使用哪台机器等)。

这里的主要问题是可能有任意数量的客户端同时更新。一种解决方案是锁定大约 20 个状态为“0”的行,更新最旧的行,然后再次释放所有锁。我一直在研究 TransactionMiddleware,但我不明白这将如何防止在我查询它后从我下面更新最旧的情况。

我已经研究了 QuerySet.update() 事情,它看起来很有希望,但是如果两个客户持有相同的记录,状态会简单地更新,我们会有两个工作人员从事同一个工作..我真的很茫然。

我还发现票#2705似乎可以很好地处理此案,但由于我有限的 SVN 经验,我不知道如何从那里获取代码(最后的更新只是差异,但我不知道如何合并与代码的主干)。

代码:结果 = 工作

class Result(models.Model):
"""
Result: completed- and pending runs

'ToDo': job hasn't been acquired by a client
'Locked': job has been acquired
'Paused'
"""
# relations
run = models.ForeignKey(Run)
input = models.ForeignKey(Input)

PROOF_CHOICES = (
    (1, 'Maybe'),
    (2, 'No'),
    (3, 'Yes'),
    (4, 'Killed'),
    (5, 'Error'),
    (6, 'NA'),
)
proof_status = models.IntegerField(
    choices=PROOF_CHOICES,
    default=6,
    editable=False)

STATUS_CHOICES = (
    (0, 'ToDo'),
    (1, 'Locked'),
    (2, 'Done'),
)
result_status = models.IntegerField(choices=STATUS_CHOICES, editable=False, default=0)

# != 'None' => status = 'Done'
proof_data = models.FileField(upload_to='results/',
    null=True, blank=True)
# part of the proof_data
stderr = models.TextField(editable=False,
    null=True, blank=True)

realtime = models.TimeField(editable=False,
    null=True, blank=True)
usertime = models.TimeField(editable=False,
    null=True, blank=True)
systemtime = models.TimeField(editable=False,
    null=True, blank=True)

# updated when client sets status to locked
start_time = models.DateTimeField(editable=False)

worker = models.ForeignKey('Worker', related_name='solved',
    null=True, blank=True)
4

3 回答 3

1

要将#2705 合并到您的 django 中,您需要先下载它:

cd <django-dir>
wget http://code.djangoproject.com/attachment/ticket/2705/for_update_11366_cdestigter.diff?format=raw

然后将 svn 倒回到必要的 django 版本:

svn update -r11366

然后应用它:

patch -p1 for_update_11366_cdestigter.diff

它将通知您哪些文件已成功修补,哪些未成功修补。在不太可能发生冲突的情况下,您可以手动修复它们http://code.djangoproject.com/attachment/ticket/2705/for_update_11366_cdestigter.diff

要取消应用补丁,只需编写

svn revert --recursive . 
于 2010-01-04T15:38:02.207 回答
1

如果您的 django 在一台机器上运行,有一种更简单的方法可以做到这一点......请原谅伪代码,因为您的实现细节尚不清楚。

from threading import Lock

workers_lock = Lock()

def get_work(request):
    workers_lock.acquire()
    try:
        # Imagine this method exists for brevity
        work_item = WorkItem.get_oldest()
        work_item.result_status = 1
        work_item.save()
    finally:
        workers_lock.release()

    return work_item
于 2010-01-04T16:56:12.813 回答
0

你有两个选择我的头顶。一种是在检索时立即锁定行,并且仅在适当的行被标记为正在使用时才释放锁定。这里的问题是没有其他客户端进程甚至可以查看没有被选中的作业。如果您总是自动选择最后一个,那么它可能是一个足够简短的窗口,对您来说可以。

另一种选择是恢复在查询时打开的行,但是在客户端尝试获取要使用的作业时再次检查。当客户端尝试更新作业以处理它时,首先会进行检查以查看它是否仍然可用。如果其他人已经抓住了它,那么通知将被发送回客户端。这允许所有客户端将所有作业视为快照,但如果他们不断获取最新的作业,那么您可能会让客户端不断收到作业已在使用中的通知。也许这就是您所指的竞争条件?

解决这个问题的一种方法是将特定组中的作业返回给客户,这样他们就不会总是得到相同的列表。例如,按地理区域甚至随机分解它们。例如,每个客户端的 ID 可以是 0 到 9。获取作业 ID 的 mod,并将具有相同结束数字的作业发送回客户端。但是不要将其限制在那些工作上,因为你不希望有你无法接触到的工作。因此,例如,如果您有 1、2 和 3 的客户以及 104 的工作,那么没有人能够得到它。因此,一旦没有足够的具有正确结尾数字的作业,作业就会开始以其他数字返回以填充列表。您可能需要在这里使用确切的算法,但希望这能给您一个想法。

如何锁定数据库中的行以更新它们和/或发回通知将在很大程度上取决于您的 RDBMS。在 MS SQL Server 中,只要中间不需要用户干预,您就可以将所有这些工作很好地包装在存储过程中。

我希望这有帮助。

于 2010-01-04T15:30:44.720 回答