47

有没有办法在 Postgresql 中选择未锁定的行?我有一个多线程应用程序可以执行以下操作:

Select... order by id desc limit 1 for update

在一张桌子上。

如果多个线程运行此查询,它们都会尝试拉回同一行。

一个获得行锁,另一个阻塞,然后在第一个更新行后失败。我真正想要的是让第二个线程获得与WHERE子句匹配且尚未锁定的第一行。

为了澄清,我希望每个线程在选择后立即更新第一个可用行。

因此,如果有带有 的行ID: 1,2,3,4,第一个线程会进来,选择带有 的行ID=4并立即更新它。

如果在该事务期间出现第二个线程,我希望它获取行ID=3并立即更新该行。

For Share 不会完成此操作,nowait因为WHERE子句将与锁定的行匹配(ID=4 in my example)。基本上我想要的是WHERE子句中的“AND NOT LOCKED”之类的东西。

Users

-----------------------------------------
ID        | Name       |      flags
-----------------------------------------
1         |  bob       |        0
2         |  fred      |        1
3         |  tom       |        0
4         |  ed        |        0

如果查询是“ Select ID from users where flags = 0 order by ID desc limit 1”并且当返回一行时,下一个是“ Update Users set flags = 1 where ID = 0”,那么我希望第一个线程用 来抓取行,ID 4下一个线程用 来抓取行ID 3

如果我将“”附加For Update到选择,那么第一个线程获取该行,第二个线程阻塞然后什么都不返回,因为一旦第一个事务提交,该WHERE子句就不再满足。

如果我不使用“ For Update”,那么我需要在后续更新中添加一个 WHERE 子句(WHERE flags = 0),这样只有一个线程可以更新该行。

第二个线程将选择与第一个线程相同的行,但第二个线程的更新将失败。

无论哪种方式,第二个线程都无法获取行并更新,因为我无法让数据库将第 4 行提供给第一个线程,将第 3 行提供给第二个线程,事务重叠。

4

14 回答 14

33

此功能SELECT ... SKIP LOCKED正在 Postgres 9.5 中实现。http://www.depesz.com/2014/10/10/waiting-for-9-5-implement-skip-locked-for-row-level-locks/

于 2014-10-10T23:19:18.360 回答
9

不不不 :-)

我知道作者的意思。我有类似的情况,我想出了一个很好的解决方案。首先,我将从描述我的情况开始。我有一个表我存储必须在特定时间发送的消息。PG 不支持函数的定时执行,所以我们必须使用守护进程(或 cron)。我使用一个自定义编写的脚本来打开几个并行进程。每个进程选择一组必须以 +1 秒 / -1 秒的精度发送的消息。表本身会使用新消息动态更新。

所以每个进程都需要下载一组行。这组行不能被其他进程下载,因为它会造成很多混乱(有些人会收到几条消息,而他们应该只收到一条消息)。这就是我们需要锁定行的原因。使用锁下载一组消息的查询:

FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE FOR UPDATE LOOP
-- DO SMTH
END LOOP;

每 0.5 秒启动一个带有此查询的进程。所以这将导致下一个查询等待第一个锁来解锁行。这种方法会造成巨大的延迟。即使我们使用NOWAIT,查询也会导致我们不想要的异常,因为表中可能有必须发送的新消息。如果仅使用 FOR SHARE 查询将正确执行,但仍将花费大量时间造成巨大的延迟。

为了让它工作,我们做了一个小魔术:

  1. 更改查询:

    FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE AND is_locked(msg_id) IS FALSE FOR SHARE LOOP
    -- DO SMTH
    END LOOP;
    
  2. 神秘的函数“is_locked(msg_id)”如下所示:

    CREATE OR REPLACE FUNCTION is_locked(integer) RETURNS BOOLEAN AS $$
    DECLARE
        id integer;
        checkout_id integer;
        is_it boolean;
    BEGIN
        checkout_id := $1;
        is_it := FALSE;
    
        BEGIN
            -- we use FOR UPDATE to attempt a lock and NOWAIT to get the error immediately 
            id := msg_id FROM public.messages WHERE msg_id = checkout_id FOR UPDATE NOWAIT;
            EXCEPTION
                WHEN lock_not_available THEN
                    is_it := TRUE;
        END;
    
        RETURN is_it;
    
    END;
    $$ LANGUAGE 'plpgsql' VOLATILE COST 100;
    

当然,我们可以自定义此功能以处理您数据库中的任何表。在我看来,最好为一张表创建一个检查功能。向此函数添加更多内容只会使其变慢。无论如何,我需要更长的时间来检查这个条款,所以没有必要让它变得更慢。对我来说,这是完整的解决方案,而且效果很好。

现在,当我让我的 50 个进程并行运行时,每个进程都有一组独特的新消息要发送。发送后,我只需使用 sent = TRUE 更新该行,并且永远不会再返回它。

我希望这个解决方案也适用于你(作者)。如果您有任何问题,请告诉我:-)

哦,让我知道这是否也对你有用。

于 2010-07-14T02:52:32.287 回答
6

我使用这样的东西:

select  *
into l_sms
from sms
where prefix_id = l_prefix_id
    and invoice_id is null
    and pg_try_advisory_lock(sms_id)
order by suffix
limit 1;

并且不要忘记调用 pg_advisory_unlock

于 2010-12-25T14:02:01.967 回答
5

如果您正在尝试实现队列,请查看 PGQ,它已经解决了这个问题和其他问题。http://wiki.postgresql.org/wiki/PGQ_Tutorial

于 2010-12-25T18:28:49.467 回答
2

看来您正在尝试执行诸如获取队列中尚未被另一个进程处理的最高优先级项之类的操作。

一个可能的解决方案是添加一个 where 子句,将其限制为未处理的请求:

select * from queue where flag=0 order by id desc for update;
update queue set flag=1 where id=:id;
--if you really want the lock:
select * from queue where id=:id for update;
...

希望第二个事务会在标志更新发生时阻塞,然后它将能够继续,但标志会将其限制为下一个。

也有可能使用可序列化的隔离级别,您可以获得您想要的结果,而无需所有这些疯狂。

根据应用程序的性质,可能有比在数据库中更好的实现方式,例如 FIFO 或 LIFO 管道。此外,可能会颠倒您需要它们的顺序,并使用顺序来确保按顺序处理它们。

于 2009-02-03T16:17:57.657 回答
1

这可以通过 SELECT ...NOWAIT; 来完成。一个例子是here

于 2008-12-23T17:56:01.693 回答
1

我的解决方案是使用带有 RETURNING 子句的 UPDATE 语句。

Users

-----------------------------------
ID        | Name       |      flags
-----------------------------------
1         |  bob       |        0  
2         |  fred      |        1  
3         |  tom       |        0   
4         |  ed        |        0   

而不是SELECT .. FOR UPDATE使用

BEGIN; 

UPDATE "Users"
SET ...
WHERE ...;
RETURNING ( column list );

COMMIT;

因为 UPDATE 语句在表上获得了 ROW EXCLUSIVE 锁,所以它的更新是序列化的更新。仍然允许读取,但它们只能在 UPDATE 事务开始之前看到数据。

参考: Pg 文档的并发控制章节。

于 2011-04-11T04:24:21.683 回答
0

看起来您正在寻找 SELECT FOR SHARE。

http://www.postgresql.org/docs/8.3/interactive/sql-select.html#SQL-FOR-UPDATE-SHARE

FOR SHARE 的行为类似,除了它在每个检索到的行上获取共享锁而不是排他锁。共享锁阻止其他事务对这些行执行 UPDATE、DELETE 或 SELECT FOR UPDATE,但不会阻止它们执行 SELECT FOR SHARE。

如果在 FOR UPDATE 或 FOR SHARE 中命名了特定的表,那么只有来自这些表的行被锁定;SELECT 中使用的任何其他表都照常读取。没有表列表的 FOR UPDATE 或 FOR SHARE 子句会影响命令中使用的所有表。如果将 FOR UPDATE 或 FOR SHARE 应用于视图或子查询,它会影响视图或子查询中使用的所有表。

如果需要为不同的表指定不同的锁定行为,可以编写多个 FOR UPDATE 和 FOR SHARE 子句。如果 FOR UPDATE 和 FOR SHARE 子句都提到(或隐式影响)同一个表,则将其作为 FOR UPDATE 处理。类似地,如果在影响它的任何子句中指定了表,则将其作为 NOWAIT 处理。

FOR UPDATE 和 FOR SHARE 不能用在返回的行不能用单个表行清楚地标识的上下文中;例如,它们不能与聚合一起使用。

于 2008-12-23T17:50:19.300 回答
0

你想达到什么目的?你能更好地解释为什么解锁的行更新和完整的事务都不会做你想要的吗?

更好的是,您能否防止争用并让每个线程使用不同的偏移量?如果表的相关部分经常更新,这将无法正常工作;您仍然会发生碰撞,但仅在插入负载较重的情况下。

Select... order by id desc offset THREAD_NUMBER limit 1 for update
于 2008-12-23T18:00:40.950 回答
0

由于我还没有找到更好的答案,我决定在我的应用程序中使用锁定来同步对执行此查询的代码的访问。

于 2008-12-23T20:02:25.213 回答
0

我在我们的应用程序中遇到了同样的问题,并提出了一个与 Grant Johnson 的方法非常相似的解决方案。FIFO 或 LIFO 管道不是一个选项,因为我们有一组应用程序服务器访问一个数据库。我们所做的是一个

SELECT ... WHERE FLAG=0 ... FOR UPDATE
紧随其后的是
UPDATE ... SET FLAG=1 WHERE ID=:id
尽快,以使锁定时间尽可能短。根据表列数和大小,仅在第一次选择中获取 ID 并标记行以获取剩余数据后,这可能会有所帮助。存储过程可以进一步减少往返次数。

于 2009-06-24T20:41:19.967 回答
0

下面的呢?它可能比其他示例更原子地处理,但仍应进行测试以确保我的假设没有错。

UPDATE users SET flags = 1 WHERE id = ( SELECT id FROM users WHERE flags = 0 ORDER BY id DESC LIMIT 1 ) RETURNING ...;

您可能仍然会被 postgres 内部使用的任何锁定方案所困,以在面对同时更新时提供一致的 SELECT 结果。

于 2008-12-24T17:48:46.637 回答
0

^^ 那行得通。考虑具有“锁定”的“立即”状态。

假设你的桌子是这样的:

编号 | 姓名 | 姓氏 | 地位

例如,可能的状态是:1=待处理、2=锁定、3=已处理、4=失败、5=拒绝

每条新记录都以待处理状态插入(1)

您的程序会:“更新 mytable set status = 2 where id = (select id from mytable where name like '%John%' and status = 1 limit 1) returns id, name, surname”

然后你的程序做它的事情,如果它得出这个线程根本不应该处理该行的结论,它会:“update mytable set status = 1 where id = ?”

除此之外,它会更新到其他状态。

于 2009-12-07T12:40:33.897 回答
0

在多线程和集群中使用?
这个怎么样?

START TRANSACTION;

// All thread retrive same task list
// If result count is very big, using cursor 
//    or callback interface provied by ORM frameworks.
var ids = SELECT id FROM tableName WHERE k1=v1;

// Each thread get an unlocked recored to process.
for ( id in ids ) {
   var rec = SELECT ... FROM tableName WHERE id =#id# FOR UPDATE NOWAIT;
   if ( rec != null ) {
    ... // do something
   }
}

COMMIT;
于 2011-11-17T07:21:52.323 回答