从未尝试过,但我认为应该可以在 celery 任务中调用 luigi 任务表单,就像你从 python 代码中做的一样:
from foobar import MyTask
from luigi import scheduler
task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()
关于扩展你的队列和 celery 工作者:如果你有太多的 celery 工作者调用 luigi 任务,它当然需要你扩展你的 luigi 调度程序/守护进程,以便它可以处理 API 请求的数量(每次调用要执行的任务时) ,你每 N 秒点击一次 luigi 调度程序 API - 这取决于你的配置 - 你的任务将点击调度程序 API 说“我还活着”,每次任务完成时 - 错误或成功 - 你点击调度程序 API , 等等)。
所以,是的,仔细查看你的调度程序,看看它是否接收到太多的 http 请求,或者它的数据库是否是一个瓶颈(luigi 默认使用一个 sqlite,但你可以轻松地将其更改为 mysql o postgres)。
更新:
自版本 2.7.0起,luigi.scheduler.CentralPlannerScheduler
已重命名luigi.scheduler.Scheduler
为您可能在此处看到的,所以上面的代码现在应该是:
from foobar import MyTask
from luigi import scheduler
task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()