根据 celery 关于实时监控 celery 工人的教程,还可以通过编程方式捕获工人产生的事件并采取相应的行动。
我的问题是如何在 Celery-Django 应用程序中集成一个监视器作为本示例中的监视器?
编辑:教程中的代码示例如下所示:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task_id = event['uuid']
print('TASK FAILED: %s[%s] %s' % (
event['name'], task_id, state[task_id].info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'worker-heartbeat': announce_dead_workers,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
celery = Celery(broker='amqp://guest@localhost//')
my_monitor(celery)
所以我想捕获工作人员发送的 task_failed 事件,并像教程所示那样获取它的 task_id,从为我的应用程序配置的结果后端获取此任务的结果并进一步处理它。我的问题是如何获取应用程序对我来说并不明显,因为在 django-celery 项目中,Celery 库的实例化对我来说并不透明。
我也愿意接受任何其他关于在工作人员完成执行任务时如何处理结果的想法。