celery.events.state.State()
是一种用于跟踪 celery 工人和任务状态的数据结构。调用 时State()
,您会得到一个没有数据的空状态对象。
您应该使用app.events.Receiver
(流处理)或celery.events.snapshot
(批处理)来捕获包含任务的状态。
示例代码:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)