25

我有一个包含约 50K 行的数据库表,每行代表一项需要完成的工作。我有一个程序从数据库中提取工作,完成工作并将结果放回数据库。(这个系统现在正在运行)

现在我想允许一个以上的处理任务来完成工作,但要确保没有任务被完成两次(作为性能问题而不是这会导致其他问题)。因为访问是通过存储过程进行的,所以我目前的做法是用看起来像这样的东西替换所述存储过程

update tbl 
set owner = connection_id() 
where available and owner is null limit 1;

select stuff 
from tbl 
where owner = connection_id();

顺便提一句; 工人的任务可能会在获得工作和提交结果之间失去联系。此外,除非我把那部分搞砸(每分钟约 5 个工作),否则我不认为数据库会接近瓶颈

这有什么问题吗?有一个更好的方法吗?

注意:“数据库作为 IPC 反模式”在这里只是稍微恰当,因为

  1. 我没有做 IPC(没有生成行的进程,它们现在都已经存在)和
  2. 针对该反模式描述的主要抱怨是,当进程等待消息时,它会导致数据库上出现不必要的负载(在我的情况下,如果没有消息,一切都可以在完成后关闭)
4

6 回答 6

36

在关系数据库系统中实现作业队列的最佳方式是使用SKIP LOCKED.

SKIP LOCKED是一种锁获取选项,适用于读/共享 ( FOR SHARE) 或写/独占 ( FOR UPDATE) 锁,现在得到广泛支持:

  • Oracle 10g 及更高版本
  • PostgreSQL 9.5 及更高版本
  • SQL Server 2005 及更高版本
  • MySQL 8.0 及更高版本

现在,考虑我们有下post表:

发布表

status列用作Enum,具有以下值:

  • PENDING(0),
  • APPROVED(1),
  • SPAM(2)。

如果我们有多个并发用户尝试审核post记录,我们需要一种方法来协调他们的努力,以避免让两个审核者审核同一post行。

所以,SKIP LOCKED这正是我们所需要的。如果两个并发用户 Alice 和 Bob 执行以下 SELECT 查询,这些查询将独占锁定帖子记录,同时还添加了SKIP LOCKED选项:

[Alice]:
SELECT
    p.id AS id1_0_,1
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
 
[Bob]:                                                                                                                                                                                                              
SELECT
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM
    post p
WHERE
    p.status = 0
ORDER BY
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

我们可以看到 Alice 可以选择前两个条目,而 Bob 选择接下来的 2 条记录。如果没有SKIP LOCKED,Bob 的锁获取请求将被阻塞,直到 Alice 释放前 2 条记录的锁。

于 2019-04-16T07:04:24.867 回答
14

这是我过去成功使用的:

MsgQueue 表架构

MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL  
SourceCode varchar(20)  -- process inserting the message -- NULLable  
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL 
CreateTime datetime -- default GETDATE() -- NOT NULL  
Msg varchar(255) -- NULLable  

您的消息类型是您所期望的 - 符合插入过程和读取过程之间的约定的消息,使用 XML 或您选择的其他表示形式进行结构化(在某些情况下 JSON 会很方便,例如实例)。

然后可以插入 0 到 n 进程,并且可以读取和处理消息的 0 到 n 进程,每个读取进程通常处理单个消息类型。一个进程类型的多个实例可以运行以实现负载平衡。

阅读器拉出一条消息并将状态更改为“A”ctive,同时它正在处理它。完成后,它将状态更改为“C”完成。它可以删除或不删除消息,具体取决于您是否要保留审计跟踪。State = 'N' 的消息按 MsgType/Timestamp 顺序拉取,因此在 MsgType + State + CreateTime 上有一个索引。

变化:
“E”错误的状态。
Reader 进程代码列。
状态转换的时间戳。

这为执行您所描述的许多事情提供了一个很好的、可扩展的、可见的、简单的机制。如果您对数据库有基本的了解,那么它非常简单且可扩展。


评论中的代码:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) 
AS 
DECLARE @MsgId INT 

BEGIN TRAN 

SELECT TOP 1 @MsgId = MsgId 
FROM MsgQueue 
WHERE MessageType = @pMessageType AND State = 'N' 
ORDER BY CreateTime


IF @MsgId IS NOT NULL 
BEGIN 

UPDATE MsgQueue 
SET State = 'A' 
WHERE MsgId = @MsgId 

SELECT MsgId, Msg 
FROM MsgQueue 
WHERE MsgId = @MsgId  
END 
ELSE 
BEGIN 
SELECT MsgId = NULL, Msg = NULL 
END 

COMMIT TRAN
于 2008-11-17T23:31:54.090 回答
0

与其不拥有 owner = null ,不如将其设置为假的 nobody 记录。搜索 null 不会限制索引,您最终可能会进行表扫描。(这是针对 oracle 的,SQL server 可能不同)

于 2008-11-17T23:13:34.830 回答
0

就像可能的技术变化一样,您可能会考虑使用 MSMQ 或类似的东西。

您的每个作业/线程都可以查询消息队列以查看是否有新作业可用。因为读取消息的行为会将其从堆栈中删除,所以您可以确保只有一个作业/线程会收到消息。

当然,这是假设您使用的是 Microsoft 平台。

于 2008-11-17T23:55:19.503 回答
0

有关上下文,请参阅 Vlad 的答案,我只是在 Oracle 中添加等价物,因为有一些“陷阱”需要注意。

SELECT * FROM t order by x limit 2 FOR UPDATE OF t SKIP LOCKED

不会以您可能期望的方式直接转换为 Oracle。如果我们看一些翻译选项,我们可能会尝试以下任何一种:

SQL> create table t as
  2   select rownum x
  3   from dual
  4   connect by level <= 100;

Table created.

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x for update skip locked fetch first 2 rows only;
  5  end;
  6  /
  open rc for select * from t order by x for update skip locked fetch first 2 rows only;
                                                                *
ERROR at line 4:
ORA-06550: line 4, column 65:
PL/SQL: ORA-00933: SQL command not properly ended
ORA-06550: line 4, column 15:
PL/SQL: SQL Statement ignored

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from t order by x fetch first 2 rows only for update skip locked ;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

或者尝试回退到 ROWNUM 选项

SQL> declare
  2    rc sys_refcursor;
  3  begin
  4    open rc for select * from ( select * from t order by x ) where rownum <= 10 for update skip locked;
  5  end;
  6  /
declare
*
ERROR at line 1:
ORA-02014: cannot select FOR UPDATE from view with DISTINCT, GROUP BY, etc.
ORA-06512: at line 4

你不会得到任何快乐。因此,您需要自己控制“n”行的获取。因此,您可以编写如下代码:

SQL> declare
  2    rc sys_refcursor;
  3    res1 sys.odcinumberlist := sys.odcinumberlist();
  4  begin
  5    open rc for select * from t order by x for update skip locked;
  6    fetch rc bulk collect into res1 limit 10;
  7  end;
  8  /

PL/SQL procedure successfully completed.
于 2020-10-26T09:23:12.570 回答
-6

您正在尝试实施 de“数据库作为 IPC”反模式。查看它以了解为什么您应该考虑正确地重新设计您的软件。

于 2008-11-17T23:06:28.183 回答