9

我将 celery 2.4.1 与 python 2.6、rabbitmq 后端和 django 一起使用。如果工人关闭,我希望我的任务能够正确清理。据我所知,您无法提供任务析构函数,因此我尝试连接到worker_shutdown信号。

注意:AbortableTask仅适用于数据库后端,所以我不能使用它。

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

但是,关闭挂钩永远不会被调用。Ctrl-C'ing 工人不会杀死任务,我必须从 shell 手动杀死它。

因此,如果这不是正确的方法,我该如何让任务优雅地关闭?

4

1 回答 1

11

worker_shutdown仅由 发送MainProcess,而不是子池工作人员发送。所有worker_*信号except for worker_process_init,请参阅MainProcess

但是,关闭挂钩永远不会被调用。Ctrl-C'ing 工人不会终止任务,我必须从 shell 手动终止它。

在正常(热)关闭的情况下,工作人员永远不会终止任务。即使一项任务需要数天才能完成,工作人员在完成之前不会完成关机。您可以设置--soft-time-limit, 或--time-limitto 告诉实例何时可以终止任务。

因此,要添加任何类型的进程清理过程,您首先需要确保任务能够真正完成。因为在此之前不会调用清理。

要向池工作进程添加清理步骤,您可以使用以下内容:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)
于 2011-11-22T16:47:36.793 回答