0

我有一个简单的过程,我需要处理表的记录,并且理想情况下运行该过程的多个实例而不处理相同的记录。我使用 MySQL 完成此操作的方式相当普遍(尽管我认为 token 字段更像是一种 hack):

向表中添加几个字段:

CREATE TABLE records (
    id INTEGER PRIMARY KEY AUTO_INCREMENT,
    ...actual fields...

    processed_at DATETIME DEFAULT NULL,
    process_token TEXT DEFAULT NULL
);

然后是一个简单的处理脚本:

process_salt = md5(rand()) # or something like a process id

def get_record():
    token = md5(microtime + process_salt)
    db.exec("UPDATE records SET process_token = ?
             WHERE processed_at IS NULL LIMIT 1", token)
    return db.exec("SELECT * FROM records WHERE token = ?", token)

while (row = get_record()) is valid:
    # ...do processing on row...

    db.exec("UPDATE records SET processed_at = NOW(), token = NULL
             WHERE id = ?", row.id)

我正在使用 PostgreSQL 数据库的系统中实现这样的过程。我知道 Pg 在锁定方面可能被认为比 MySQL 更成熟,这要归功于 MVCC——我可以在 Pg 中使用行锁定或其他一些功能来代替令牌字段吗?

4

2 回答 2

1

这种方法适用于 PostgreSQL,但它往往效率很低,因为您要更新每一行两次——每次更新需要两个事务,两个提交。commit_delay使用 a并可能禁用可以在一定程度上减轻此成本synchronous_commit,但除非您的存储子系统上有非易失性回写缓存,否则它仍然不会很快。

更重要的是,因为您正在提交第一次更新,所以无法区分仍在工作的工人和已经崩溃的工人之间的区别。如果所有工作人员都在本地计算机上,您可能可以将令牌设置为工作人员的进程 ID,然后偶尔扫描丢失的 PID,但这很麻烦且容易出现竞争条件,更不用说 pid 重用的问题了。

我建议您采用真正的排队解决方案,旨在解决这些问题,如 ActiveMQ、RabbitMQ、ZeroMQ 等。PGQ也可能很重要。

在事务性关系数据库中进行队列处理应该很容易,但在实践中,要做好并正确处理是非常困难的。大多数一目了然的“解决方案”实际上是对所有工作进行序列化(因此在任何给定时间,只有许多队列工作人员中的一个在做任何事情)在详细检查时。

于 2013-04-10T23:48:03.427 回答
0

你可以使用SELECT ... FOR UPDATE NOWAITwhich 将获得该行的排他锁,或者如果它已经被锁定则报告错误。

于 2013-04-11T03:35:13.570 回答