我想在 postgresql 中创建一个持久的作业队列。这样多个工作人员可以从队列中选择一项作业(使用select for update
with skip locked
),对其进行处理,然后将其从队列中删除。我有一张桌子:
create table queue (
id serial primary key
some_job_param1 text not null
some_job_param2 text not null
)
现在,如果有两个工作,那么它工作正常:
启动worker1
事务并选择第一个作业
begin;
select * from queue for update skip locked limit 1;
开始处理。worker2
做同样的事情并使用相同的查询选择第二个作业。
完成worker1
工作后,将其从队列中删除并提交事务:
delete from queue where id=$1;
commit;
然后worker1
它准备好接受新工作,所以它做同样的事情。开始新事务,选择未锁定的作业。但问题是,没有更多的工作,查询返回零行。
理想的情况是查询会阻塞,直到有新作业并返回结果。有可能吗?还是我走错了方向?
编辑:
工人是一个外部过程。因此,如果工作人员死亡,会话也会终止,事务也会终止。然后选定的作业将不再被锁定并准备好供其他工作人员使用。伪代码如下所示:
while (true) {
tx = db.new_transaction()
job_id, param1, param2 = tx.query("select * from queue for update skip locked limit 1")
try {
some_long_processing(param1, param2)
tx.commit()
} catch {
tx.rollback()
}
}