43

这可能是我第十次实施这样的事情,而且我从未对自己提出的解决方案感到 100% 满意。

使用 mysql 表而不是“适当的”消息传递系统的原因很有吸引力,主要是因为大多数应用程序已经使用一些关系数据库来处理其他东西(对于我一直在做的大部分事情,这往往是 mysql),而很少有应用程序使用消息系统。此外 - 关系数据库具有非常强的 ACID 属性,而消息传递系统通常没有。

第一个想法是使用:

创建表作业(
  id auto_increment 不是 null 主键,
  消息文本不为空,
  process_id varbinary(255) null 默认 null,
  关键jobs_key(process_id)
);

然后入队看起来像这样:

插入工作(消息)值('blah blah');

出队看起来像这样:

开始;
select * from jobs where process_id is null order by id asc limit 1;
更新作业集 process_id = ? 其中 id = ?; ——无论我得到什么
犯罪;
-- 返回 (id, message) 到应用程序,完成后清理

表和入队看起来不错,但出队有点困扰我。回滚的可能性有多大?还是被封杀?我应该使用什么键使它成为 O(1)-ish?

还是有比我正在做的更好的解决方案?

4

8 回答 8

29

你的出队可能更简洁。您可以在没有显式事务的情况下在一个原子语句中完成,而不是依赖事务回滚:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

然后,您可以使用(括号 [] 表示可选,具体取决于您的具体情况)来拉取作业:

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];
于 2011-09-05T03:55:05.963 回答
8

我已经建立了一些消息队列系统,我不确定你指的是什么类型的消息,但在出队的情况下(这是一个词吗?)我做了你做过的同样的事情. 你的方法看起来简单、干净、可靠。并不是说我的工作是最好的,但事实证明它对于许多站点的大型监控非常有效。(错误记录、大量电子邮件营销活动、社交网络通知)

我的投票:不用担心!

于 2009-01-08T05:26:23.870 回答
7

Brian Aker不久前谈到了队列引擎。也有关于SELECT table FROM DELETE语法的讨论。

如果您不担心吞吐量,您可以随时使用SELECT GET_LOCK()作为互斥锁。例如:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

如果你想真正花哨,把它包装在一个存储过程中。

于 2009-01-08T04:29:49.410 回答
4

在 MySQL 8 中,您可以使用 newNOWAITSKIP LOCKED关键字来避免特殊锁​​定机制的复杂性:

START TRANSACTION;
SELECT id, message FROM jobs
 WHERE process_id IS NULL
 ORDER BY id ASC LIMIT 1
 FOR UPDATE SKIP LOCKED;
UPDATE jobs
 SET process_id = ?
 WHERE id = ?;
COMMIT;

传统上,如果没有 hack 和不寻常的特殊表或列、不可靠的解决方案或失去并发性,这很难实现。

SKIP LOCKED可能会导致大量消费者出现性能问题。

然而,这仍然不能处理在事务回滚时自动标记作业完成。为此,您可能需要保存点。然而,这可能无法解决所有情况。您真的想设置一个操作以在事务失败时执行,但作为事务的一部分!

将来可能会有更多功能来帮助优化案例,例如也可以返回匹配行的更新。随时了解更改日志中的新特性和功能非常重要。

于 2019-12-03T19:40:54.657 回答
2

这是我使用的一个解决方案,在没有当前线程的 process_id 的情况下工作,或者锁定表。

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

在 $row 数组中获取结果,然后执行:

DELETE from jobs WHERE ID=$row['ID'];

然后获取受影响的行(mysql_affected_rows)。如果存在受影响的行,则处理 $row 数组中的作业。如果有 0 个受影响的行,则表示其他进程已经在处理选定的作业。重复上述步骤,直到没有行。

我已经使用具有 100k 行的“作业”表对此进行了测试,并产生了 20 个执行上述操作的并发进程。没有发生比赛条件。您可以修改上述查询以使用处理标志更新行,并在实际处理后删除该行:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

不用说,您应该使用适当的消息队列(ActiveMQ、RabbitMQ 等)来完成此类工作。不过,我们不得不求助于这个解决方案,因为我们的主机在更新软件时经常会破坏东西,所以破坏的东西越少越好。

于 2016-04-15T17:57:06.003 回答
1

我建议使用Quartz.NET

它有 SQL Server、Oracle、MySql、SQLite 和 Firebird 的提供程序。

于 2009-01-08T03:11:31.083 回答
1

该线程具有应可映射的设计信息。

去引用:

这是我过去成功使用的:

MsgQueue 表架构

MsgId 身份 -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- 插入消息的进程 -- NULLable
State char(1) -- 'N'new if queued, 'A'(ctive) if处理,'C'完成,默认'N' -- NOT NULL
CreateTime datetime -- 默认 GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable

您的消息类型是您所期望的 - 消息符合插入过程和读取过程之间的约定,使用 XML 或您选择的其他表示形式进行结构化(在某些情况下 JSON 会很方便,例如实例)。

然后可以插入 0 到 n 进程,并且可以读取和处理消息的 0 到 n 进程,每个读取进程通常处理单个消息类型。一个进程类型的多个实例可以运行以实现负载平衡。

阅读器拉出一条消息并将状态更改为“A”ctive,同时它正在处理它。完成后,它将状态更改为“C”完成。它可以删除或不删除消息,具体取决于您是否要保留审计跟踪。State = 'N' 的消息按 MsgType/Timestamp 顺序拉取,因此在 MsgType + State + CreateTime 上有一个索引。

变化:
“E”错误的状态。
Reader 进程代码列。
状态转换的时间戳。

这为执行您所描述的许多事情提供了一个很好的、可扩展的、可见的、简单的机制。如果您对数据库有基本的了解,那么它非常简单且可扩展。由于原子状态转换事务,锁回滚等从来没有问题。

于 2009-01-08T05:16:42.843 回答
0

您可以有一个中间表来维护队列的偏移量。

create table scan(
  scan_id int primary key,
  offset_id int
);

您可能还会进行多次扫描,因此每次扫描有一个偏移量。在扫描开始时初始化 offset_id = 0。

begin;
select * from jobs where order by id where id > (select offset_id from scan where scan_id = 0)  asc limit 1;
update scan set offset_id = ? where scan_id = ?; -- whatever i just got
commit;

您需要做的只是保持最后一个偏移量。这也将为您节省大量空间(每条记录的 process_id)。希望这听起来合乎逻辑。

于 2019-03-28T12:26:35.320 回答