有趣的问题,我认为使用广播命令应该很容易解决。如果当一个新工作人员启动时,它要求所有其他工作人员将其撤销的任务转储给新工作人员。添加两个新的远程控制命令,您可以通过使用轻松添加新命令@Panel.register
,
模块控制.py:
from celery.worker import state
from celery.worker.control import Panel
@Panel.register
def bulk_revoke(panel, ids):
state.revoked.update(ids)
@Panel.register
def broadcast_revokes(panel, destination):
panel.app.control.broadcast("bulk_revoke", arguments={
"ids": list(state.revoked)},
destination=destination)
将其添加到 CELERY_IMPORTS:
CELERY_IMPORTS = ("control", )
现在唯一缺少的问题是连接它,以便新工作人员broadcast_revokes
在启动时触发。我想你可以使用这个worker_ready
信号:
from celery import current_app as celery
from celery.signals import worker_ready
def request_revokes_at_startup(sender=None, **kwargs):
celery.control.broadcast("broadcast_revokes",
destination=sender.hostname)