1

我使用 Celery 和 Redis 作为代理,我可以看到队列实际上是一个以序列化任务作为项目的 redis 列表。

我的问题是,如果我有一个 AsyncResult 对象作为调用的结果<task>.delay(),有没有办法确定该项目在队列中的位置?

更新:

我终于能够使用以下方法获得该职位:

from celery.task.control import inspect
i = inspect()
i.reserved()

但它有点慢,因为它需要与所有工人沟通。

4

1 回答 1

11

您提到的inspect.reserved()/scheduled()可能有效,但并不总是准确的,因为它只考虑了工作人员预取的任务。

Celery 不允许对队列进行带外操作,例如从队列中删除消息或重新排序它们,因为它不会在分布式系统中扩展。消息可能还没有到达队列,这可能导致竞争条件,实际上它不是具有事务操作的顺序队列,而是来自多个位置的消息流。也就是说,Celery API 基于严格的消息传递语义。

可以直接在 Celery 支持的一些代理(如 Redis 或数据库)上访问队列,但这不是公共 API 的一部分,不鼓励您这样做,但当然如果您不打算支持大规模操作你应该做任何对你来说最方便的事情,并放弃我的建议。

如果这只是为了让用户知道他的工作何时完成,那么我相信你可以想出一个算法来预测何时执行任务,如果你只有队列的长度和时间插入每个任务的位置。

第一个只是 a redis.len("celery"),而后者您可以通过收听task_sent信号来添加自己:

from celery.signals import task_sent

@task_sent.connect
def record_insertion_time(id, **kwargs):
   redis.zadd("celery.insertion_times", id)

在此处使用排序集:http ://redis.io/commands/zadd

对于纯消息传递解决方案,您可以使用专用监视器来使用 Celery 事件流并预测任务何时完成。 http://docs.celeryproject.org/en/latest/userguide/monitoring.html#event-reference

(刚刚注意到发送的任务缺少文档中的时间戳字段,但时间戳随该事件一起发送,因此我将修复它)。

事件还包含一个“时钟”字段,它是一个逻辑时钟(参见http://en.wikipedia.org/wiki/Lamport_timestamps),这可用于检测分布式系统中事件的顺序,而不依赖于系统每台机器上的时间同步(这是不可能实现的)。

于 2012-06-05T10:17:54.243 回答