我已经将一个独立的批处理作业转换为使用 celery 来调度要完成的工作。我正在使用 RabbitMQ。一切都在一台机器上运行,没有其他进程正在使用 RabbitMQ 实例。我的脚本只是创建了一堆由工作人员处理的任务。
有没有一种简单的方法来测量从我的脚本开始到所有任务完成的时间?我知道在使用消息队列时这在设计上有点复杂。但我不想在生产中这样做,只是为了测试和获得性能估计。
您可以使用celery 信号,注册的函数将在任务执行前后被调用,测量经过的时间很简单:
from time import time
from celery.signals import task_prerun, task_postrun
d = {}
@task_prerun.connect
def task_prerun_handler(signal, sender, task_id, task, args, kwargs, **extras):
d[task_id] = time()
@task_postrun.connect
def task_postrun_handler(signal, sender, task_id, task, args, kwargs, retval, state, **extras):
try:
cost = time() - d.pop(task_id)
except KeyError:
cost = -1
print task.__name__, cost
您可以通过在末尾添加一个假任务来使用和弦,该任务将传递任务发送的时间,并将返回当前时间与执行时传递的时间之间的差异。
import celery
import datetime
from celery import chord
@celery.task
def dummy_task(res=None, start_time=None):
print datetime.datetime.now() - start_time
def send_my_task():
chord(my_task.s(), dummy_task.s(start_time=datetime.datetime.now()).delay()
send_my_task
发送您想要分析的任务以及dummy_task
打印花费多长时间(或多或少)的任务。如果您想要更准确的数字,我建议将 start_time 直接传递给您的任务,并使用信号。