2

在 Celery 中,最终重新加载模块的唯一方法是重新启动所有 celery 进程。我知道如何远程关闭工作人员(broadcast('shutdown', destination = [<workers>])),但不知道如何让他们恢复。

我有一段 Python 代码可以用来创建一个包含新工作者的守护进程,但是当我尝试在 Celery 中将它作为 celery 任务运行时,我得到了AssertionError: daemonic processes are not allowed to have children,我猜这与工人池已建立。

  1. 有没有办法以某种方式覆盖芹菜中的这个错误?
  2. 如果没有,是否有另一种方法可以让 Celery 启动另一个工人来代替自己?也许将整个事情包装在一个 bash 脚本中(尽管在 Python 中这样做的目的是避免使用 Python 调用 bash 来调用 Python)。
  3. 如果没有,还有其他方法可以说服 Celery 重新加载最新版本的代码吗?旗帜--autoreloadbroadcast('pool_restart')两者都不做。

示例行为:

  1. 创造:

    @task def add(x,y): 返回 x+y

  2. 加载 celery,运行 add.delay(4,4),返回 8。

  3. 更改添加到:

    @task def add(x,y): 返回 x*y

  4. 做魔术(目前是“重启芹菜”)

  5. 再次运行 add.delay(4,4)

你应该回到 16。我总是回到 8,除非我关闭 celery 并重新加载它,如果没有办法从远程机器上调出工作进程,我就无法远程完成,最好是使用脚本。

4

2 回答 2

2

在主管下运行 Celery 守护进程,例如 supervisord。当 celeryd 进程终止时,主管将把它重新启动。

于 2013-01-25T03:14:36.967 回答
0

如果您使用的是 Flask,则可以在带有子进程的 shell 中的另一个进程中打开 celery,并使用 Werkzeug 重新加载器重新启动文件:

if __name__ == "__main__":
    celery.control.broadcast("shutdown") # Kill the previous workers
    import subprocess
    subprocess.Popen("celery worker", shell=True) # Or celeryd
    app.run(use_reloader=True) # From Flask/Werkzeug

您不妨debug=True在 Flask/Werkzeug 中使用重新加载器(不是在生产服务器中执行的操作)。您还可以使用watchdog包监视更改。这在某种程度上是蛮力......但有效。

于 2013-05-29T02:39:47.620 回答