您可以编写自定义加载程序,也可以使用信号。
Loaders 有on_task_init
方法,在任务即将执行时on_worker_init
调用,由 celery+celerybeat 主进程调用。
使用信号可能是最简单的,可用的信号有:
0.8.x:
task_prerun(task_id, task, args, kwargs)
当任务即将由工作人员执行时调度(如果使用apply
/ 或CELERY_ALWAYS_EAGER
已设置,则在本地执行)。
task_postrun(task_id, task, args, kwargs, retval)
在与上述相同的条件下执行任务后调度。
task_sent(task_id, task, args, kwargs, eta, taskset)
应用任务时调用(不适合长时间运行的操作)
0.9.x 中可用的其他信号(github 上的当前主分支):
这是一个在进程中第一次运行任务时预先计算某些内容的示例:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
如果您希望为所有任务运行该函数,只需跳过该sender
参数。