2

我有一个提醒类型的应用程序,它使用“eta”参数在 celery 中安排任务。如果提醒对象中的参数发生变化(例如提醒时间),那么我会撤销之前发送的任务并将新任务排队。

我想知道是否有任何好的方法可以跟踪 celeryd 重新启动时撤销的任务。我希望能够动态地向上/向下扩展 celeryd 进程,并且似乎在发送 revoke 命令后启动的任何 celeryd 进程仍将执行该任务。

一种方法是保留已撤销任务 ID 的列表,但这种方法会导致列表任意增长。修剪这个列表需要保证任务不再在 RabbitMQ 队列中,这似乎是不可能的。

我还尝试为每个 celeryd 工人使用共享的 --statedb 文件,但似乎 stateb 文件仅在工人终止时更新,因此不适合我想要完成的任务。

提前致谢!

4

3 回答 3

2

有趣的问题,我认为使用广播命令应该很容易解决。如果当一个新工作人员启动时,它要求所有其他工作人员将其撤销的任务转储给新工作人员。添加两个新的远程控制命令,您可以通过使用轻松添加新命令@Panel.register

模块控制.py:

from celery.worker import state
from celery.worker.control import Panel

@Panel.register
def bulk_revoke(panel, ids):
    state.revoked.update(ids)

@Panel.register
def broadcast_revokes(panel, destination):
    panel.app.control.broadcast("bulk_revoke", arguments={
         "ids": list(state.revoked)},
         destination=destination)

将其添加到 CELERY_IMPORTS:

CELERY_IMPORTS = ("control", )

现在唯一缺少的问题是连接它,以便新工作人员broadcast_revokes在启动时触发。我想你可以使用这个worker_ready 信号:

from celery import current_app as celery
from celery.signals import worker_ready

def request_revokes_at_startup(sender=None, **kwargs):
    celery.control.broadcast("broadcast_revokes",
                             destination=sender.hostname)
于 2012-04-09T15:04:53.813 回答
0

我必须在我的项目中做类似的事情并celerycamdjango-admin-monitor. 监视器会拍摄任务快照并定期将它们保存在数据库中。并且有一个很好的用户界面来浏览和检查所有任务的状态。即使您的项目不是基于 Django 的,您甚至可以使用它。

于 2012-04-08T13:12:44.673 回答
0

前段时间我实现了类似的东西,我提出的解决方案与你的非常相似。

我解决这个问题的方法是让工作人员Task在作业运行时从数据库中获取对象(通过将主键传递给它,如文档建议的那样)。在您的情况下,在发送提醒之前,工作人员应执行检查以确保任务已“准备好”运行。如果不是,它应该简单地返回而不做任何工作(假设 ETA 已更改并且另一个工人将接手新工作)。

于 2012-04-08T13:12:58.283 回答