3

我有一个这样的模型

class Thingy(models.Model):
    # ...
    failures_count = models.IntegerField()

我有需要执行此操作的并发进程(芹菜任务):

  1. 做某种处理
  2. failures_counter如果处理失败相应的增量Thingy
  3. 如果failures_counter超过 some 的阈值Thingy,则发出警告,但只有一个警告。

我有一些关于如何在没有竞争条件的情况下做到这一点的想法,例如使用显式锁(通过select_for_update):

@transaction.commit_on_success
def report_failure(thingy_id):
    current, = (Thingy.objects
               .select_for_update()
               .filter(id=thingy_id)
               .values_list('failures_count'))[0]
    if current == THRESHOLD:
        issue_warning_for(thingy_id)
    Thingy.objects.filter(id=thingy_id).update(
        failures_count=F('failures_count') + 1
    )

或者通过使用 Redis(它已经存在)进行同步:

@transaction.commit_on_success
def report_failure(thingy_id):
    Thingy.objects.filter(id=thingy_id).update(
        failures_count=F('failures_count') + 1
    )
    value = Thingy.objects.get(id=thingy_id).only('failures_count').failures_count
    if value >= THRESHOLD:
        if redis.incr('issued_warning_%s' % thingy_id) == 1:
            issue_warning_for(thingy_id)

两种解决方案都使用锁。当我使用 PostgreSQL 时,有没有办法在不锁定的情况下实现这一点?


我正在编辑问题以包含答案(感谢 Sean Vieira,请参阅下面的答案)。这个问题询问了一种避免锁定的方法,这个答案是最佳的,因为它利用了 PostgreSQL 实现的多版本并发控制(MVCC)

这个特定问题明确允许使用 PostgreSQL 功能,尽管许多 RDBMS 实现UPDATE ... RETURNING了 ,但它不是标准 SQL,并且 Django 的 ORM 不支持开箱即用,因此它需要使用原始 SQL via raw()。相同的 SQL 语句将在其他 RDBMS 中工作,但每个引擎都需要自己讨论同步、事务隔离和并发模型(例如,带有 MyISAM 的 MySQL 仍将使用锁)。

def report_failure(thingy_id):
    with transaction.commit_on_success():
        failure_count = Thingy.objects.raw("""
            UPDATE Thingy
            SET failure_count = failure_count + 1
            WHERE id = %s
            RETURNING failure_count;
        """, [thingy_id])[0].failure_count

    if failure_count == THRESHOLD:
        issue_warning_for(thingy_id)
4

2 回答 2

5

据我所知,Django 的 ORM 不支持开箱即用 - 但是,这并不意味着它不能完成,您只需要深入到 SQL 级别(在 Django 的 ORM 中通过Managerraw方法)使其工作。

如果您使用的是 PostgresSQL >= 8.2,那么您可以使用它RETURNING来获取最终值,failure_count而无需任何额外的锁定(数据库仍将锁定,但只需要足够长的时间来设置值,不会失去与您通信的额外时间):

# ASSUMPTIONS: All IDs are valid and IDs are unique
# More defenses are necessary if either of these assumptions
# are not true.
failure_count = Thingy.objects.raw("""
    UPDATE Thingy
    SET failure_count = failure_count + 1
    WHERE id = %s
    RETURNING failure_count;
""", [thingy_id])[0].failure_count

if failure_count == THRESHOLD:
    issue_warning_for(thingy_id)
于 2013-05-14T02:42:28.397 回答
0

我真的不知道你必须在没有锁定的情况下完成这项工作的原因,你有多少任务同时运行?

但是,我认为有一种方法可以做到这一点,而无需像这样锁定:

你应该有另一个模型,例如失败:

class Failure(models.Model):
    thingy = models.ForeignKey(Thingy)

你的 *report_failure* 应该是这样的:

from django.db import transaction
@transaction.commit_manually
def flush_transaction():
    transaction.commit()

@transaction.commit_on_success
def report_failure(thingy_id):
    thingy = Thingy.objects.get(id=thingy_id)
    #uncomment following line if you found that the query is cached (not get updated result)
    #flush_transaction()

    current = thingy.failure_set.count()
    if current >= THRESHOLD:
        issue_warning_for(thingy_id)
    Failure.objects.create(thingy=thingy)

我知道这种方法很糟糕,因为它会创建很多失败记录。但这是我能想出的唯一想法。对于那个很抱歉。

于 2013-05-12T22:42:56.667 回答