3

我有一个分布式任务队列,其任务如下所示:

# creates a uniquely-named file
new_path = do_work()

old_path = database.query('select old path')
unlink(old_path)

database.query('insert new path')

这里有一个竞争条件:如果任务队列软件同时启动其中两个任务,它们将从old_path数据库中得到相同的结果,并且竞争失败者的取消链接调用失败(将失败者的新路径从未来取消链接中孤立出来) )。

有没有办法可以构建这个来绕过这场比赛?如果需要,我可以从当前的设计中丢弃任何东西。具体来说,我使用的是 PostgreSQL、Python 和 Celery。我知道我可能可以使用表范围的锁定/将 psycopg2 的事务级别更改为 SERIALIZABLE,但我不确定是否可以避免这种竞争条件。表级锁定还意味着我必须为每个附加任务引入一个新表(以免它们相互阻塞),这听起来不太吸引人。

4

3 回答 3

5

我强烈建议您研究已经解决此问题的工具,例如PGQ。排队比你想象的要难得多。这不是你想重新发明的轮子。

并发难

米海的回答看起来表面上很好,但在并发操作中却有所下降。

两个并发 UPDATE 可以选择同一行(在他的示例中具有used_flag = FALSE)。其中一个将获得锁并继续。另一个将等到第一次运行并提交。当提交发生时,第二次更新将获得锁,重新检查它的条件,找到不再匹配的行,并且什么都不做。因此,除了一组并发更新中的一个之外,所有并发更新都可能返回一个空集。

READ COMMITTED模式下,您仍然可以获得不错的结果,大致相当于单个会话UPDATE连续循环。在SERIALIZABLE模式下,它将无可救药地失败。尝试一下; 这是设置:

CREATE TABLE paths (
    used_flag boolean not null default 'f',
    when_entered timestamptz not null default current_timestamp,
    data text not null
);

INSERT INTO paths (data) VALUES
('aa'),('bb'),('cc'),('dd');

这是演示。尝试三个并发会话,逐步进行。在 READ COMMITTED 中执行一次,然后在所有会话SERIALIZABLE中使用BEGIN ISOLATION LEVEL SERIALIZABLE而不是 plain再次执行BEGIN。比较结果。

SESSION 1             SESSION2         SESSION 3

BEGIN;
                                       BEGIN;

UPDATE      paths
    SET     used_flag = TRUE
    WHERE   used_flag = FALSE
    RETURNING data;

                      BEGIN;

                      INSERT INTO
                      paths(data)
                      VALUES
                      ('ee'),('ff');      

                      COMMIT;               
                                       UPDATE      paths
                                           SET     used_flag = TRUE
                                           WHERE   used_flag = FALSE
                                           RETURNING data;


                      BEGIN;

                      INSERT INTO
                      paths(data)
                      VALUES
                      ('gg'),('hh');      

                      COMMIT;        

COMMIT;

READ COMMITTED第一个 UPDATE 成功并产生四行。第二个产生剩余的两个ee,并ff在第一次更新运行后插入并提交。gg并且hh不会被第二次更新返回,即使它在提交后实际执行也是如此,因为它已经选择了它的行并且在插入它们时正在等待锁定。

孤立地,SERIALIZABLE第一次 UPDATE 成功并产生四行。第二个失败了ERROR: could not serialize access due to concurrent update。在这种情况下SERIALIZABLE,隔离不会帮助你,它只会改变失败的性质。

UPDATE如果没有显式事务,当s 并发运行时也会发生同样的事情。如果您使用显式事务,则无需摆弄时间就可以更轻松地进行演示。

只选一排怎么样?

如上所述,系统工作正常,但是如果您只想获取最旧的行怎么办?因为UPDATE在阻塞锁之前选择了要操作的行,所以您会发现在任何给定的事务集中只有一个UPDATE会返回结果。

您会想到以下技巧:

UPDATE      paths
    SET     used_flag = TRUE
    WHERE entry_id = (
        SELECT entry_id
        FROM paths 
        WHERE used_flag = FALSE
        ORDER BY when_entered
        LIMIT 1
    )
    AND used_flag = FALSE
    RETURNING data;

或者

UPDATE      paths
    SET     used_flag = TRUE
    WHERE entry_id = (
        SELECT min(entry_id)
        FROM paths 
        WHERE used_flag = FALSE
    )
    AND used_flag = FALSE
    RETURNING data;

但这些不会按您的预期工作;当同时运行时,两者都会选择相同的目标行。一个会继续,一个会阻塞锁直到第一次提交,然后继续并返回一个空结果。如果没有第二个AND used_flag = FALSE,我认为他们甚至可以返回重复项!在上面的演示表中添加一entry_id SERIAL PRIMARY KEY列后尝试。paths为了让他们参加比赛,就LOCK TABLE paths在第三节;请参阅我在以下链接中给出的示例。

我在另一个答案中写了这些问题,并且在我的答案中多线程会导致受约束集上的重复更新

说真的,去看看PGQ。它已经为你解决了这个问题。

于 2012-09-17T23:21:45.080 回答
1

不要选择旧路径,而是执行以下操作:

old_path = database.query('
    UPDATE      paths
        SET     used_flag = TRUE
        WHERE   used_flag = FALSE
        RETURNS data');

RETURNS子句允许您从刚刚更新的行(/deleted/inserted)中“选择”值。

指定该used_flag行是否已被另一个 Python 实例使用。使用该WHERE used_flag = FALSE位将确保您没有选择已经使用过的东西。

于 2012-09-17T20:03:20.080 回答
0

如果任务队列软件能够为请求提供唯一标识符,那么也许您可以将每个请求的 old_path 存储在不同的行中。如果没有,也许您可​​以为每个请求生成一个密钥并用它存储路径。

于 2012-09-17T20:03:33.860 回答