3

我正在为我的网络应用程序使用芹菜。Celery 执行父任务,然后执行进一步的任务管道

芹菜的问题

  1. 我无法获得使用 luigi 获得的依赖关系图和可视化工具,以查看我的父任务的状态

  2. Celery 不提供重新启动失败管道并从失败处开始的机制。

这两件事我可以很容易地从 luigi 那里得到。

所以我在想,一旦 celery 运行父任务,然后在该任务中我执行 Luigi 管道。

是否会有任何问题,即我需要根据 queuesize 自动缩放芹菜工人。这会影响多台机器上的任何 luigi 工人吗?

4

1 回答 1

7

从未尝试过,但我认为应该可以在 celery 任务中调用 luigi 任务表单,就像你从 python 代码中做的一样:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

关于扩展你的队列和 celery 工作者:如果你有太多的 celery 工作者调用 luigi 任务,它当然需要你扩展你的 luigi 调度程序/守护进程,以便它可以处理 API 请求的数量(每次调用要执行的任务时) ,你每 N 秒点击一次 luigi 调度程序 API - 这取决于你的配置 - 你的任务将点击调度程序 API 说“我还活着”,每次任务完成时 - 错误或成功 - 你点击调度程序 API , 等等)。

所以,是的,仔细查看你的调度程序,看看它是否接收到太多的 http 请求,或者它的数据库是否是一个瓶颈(luigi 默认使用一个 sqlite,但你可以轻松地将其更改为 mysql o postgres)。

更新

版本 2.7.0起,luigi.scheduler.CentralPlannerScheduler已重命名luigi.scheduler.Scheduler您可能在此处看到的,所以上面的代码现在应该是:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()
于 2016-05-09T14:44:18.527 回答