3

我们有一个系统,它有一个基于数据库的队列,用于在线程中而不是实时处理项目。目前在Mybatis中实现,在mysql中调用这个存储过程:

DROP PROCEDURE IF EXISTS pop_invoice_queue;
DELIMITER ;;
CREATE PROCEDURE pop_invoice_queue(IN compId int(11), IN limitRet int(11)) BEGIN

   SELECT LAST_INSERT_ID(id) as value, InvoiceQueue.* FROM InvoiceQueue 
      WHERE companyid = compId 
      AND (lastPopDate is null OR lastPopDate < DATE_SUB(NOW(), INTERVAL 3 MINUTE)) LIMIT limitRet FOR UPDATE;
   UPDATE InvoiceQueue SET lastPopDate=NOW() WHERE id=LAST_INSERT_ID(); 

END;;

DELIMITER ;

问题是这会从队列中弹出 N 个项目,但只会更新从队列中弹出的最后一个项目的 lastPopDate 值。因此,如果我们使用 limitRet = 5 调用这个存储过程,它将从队列中弹出五个项目并开始处理它们,但只有第五个项目将设置 lastPopDate,因此当下一个线程到来并从队列中弹出时,它将获取项目1-4 和第 6 项。

我们怎样才能让它更新从数据库中“弹出”的所有 N 条记录?

4

2 回答 2

2

如果您愿意BIGINT通过以下方式向表中添加字段:

ALTER TABLE InvoiceQueue
ADD uuid BIGINT NULL DEFAULT NULL,
INDEX ix_uuid (uuid);

然后您可以先进行更新,然后通过以下方式选择更新的记录:

CREATE PROCEDURE pop_invoice_queue(IN compId int(11), IN limitRet int(11))
BEGIN
   SET @uuid = UUID_SHORT();

   UPDATE InvoiceQueue
   SET    uuid = @uuid,
          lastPopDate = NOW()
   WHERE  companyid = compId
   AND    uuid IS NULL 
   AND    (lastPopDate IS NULL OR lastPopDate < NOW() - INTERVAL 3 MINUTE)
   ORDER BY
          id
   LIMIT  limitRet;

   SELECT * 
   FROM   InvoiceQueue 
   WHERE  uuid = @uuid
   FOR    UPDATE;
END;;

对于UUID_SHORT()返回唯一值的函数,每台机器每秒调用它的次数不应超过 1600 万次。访问here了解更多详情。

出于性能考虑,您可能希望将lastPopDate字段更改为,NOT NULL因为该OR子句将导致您的查询不使用索引,即使索引可用:

ALTER TABLE InvoiceQueue
MODIFY lastPopDate DATETIME NOT NULL DEFAULT '0000-00-00';

然后,如果您还没有,您可以在 // 字段上添加一个索引companyidlastPopDate如下uuid所示:

ALTER TABLE InvoiceQueue
ADD INDEX ix_company_lastpop (companyid, lastPopDate, uuid);

然后,您可以从查询中删除该OR子句:UPDATE

   UPDATE InvoiceQueue
   SET    uuid = @uuid,
          lastPopDate = NOW()
   WHERE  companyid = compId 
   AND    lastPopDate < NOW() - INTERVAL 3 MINUTE
   ORDER BY
          id
   LIMIT  limitRet;

这将使用您刚刚创建的索引。

于 2013-01-01T03:29:20.477 回答
0

由于 mysql 既没有收集也没有输出/返回子句,我的建议是使用临时表。就像是 :

CREATE TEMPORARY TABLE temp_data 
SELECT LAST_INSERT_ID(id) as value, InvoiceQueue.* FROM InvoiceQueue 
  WHERE companyid = compId 
  AND (lastPopDate is null OR lastPopDate < DATE_SUB(NOW(), INTERVAL 3 MINUTE)) LIMIT limitRet FOR UPDATE;  
UPDATE InvoiceQueue 
INNER JOIN temp_data ON (InvoiceQueue.PKColumn = temp_data.PKColumn)
SET lastPopDate=NOW();
SELECT * FROM temp_data ;
DROP TEMPORARY TABLE temp_data;

此外,我推测这select ... for update可能会导致死锁(当然,如果从不同的会话调用该过程) - 据我所知,不能保证行被锁定的顺序(即使你有order by,行也可能以不同的顺序被锁定)。我建议仔细检查文档。

于 2013-01-01T02:53:24.010 回答