21

我已经对此进行了大量研究,但我很惊讶我还没有在任何地方找到一个好的答案。

我在 Heroku 上运行一个大型应用程序,并且我有某些 celery 任务运行很长时间处理,并在任务结束时保存结果。每次我在 Heroku 上重新部署时,它都会发送 SIGTERM(最终发送 SIGKILL)并杀死我正在运行的工作人员。我试图找到一种方法让工作实例优雅地关闭自己并重新排队以供稍后处理,以便最终我们可以保存所需的结果而不是丢失排队的任务。

我找不到让工作人员正确收听 SIGTERM 的方法。我得到的最接近的,它在python manage.py celeryd直接运行时有效,但在使用工头模拟 Heroku 时无效,如下所示:

@app.task(bind=True, max_retries=1)
def slow(self, x):
    try:
        for x in range(100):
            print 'x: ' + unicode(x)
            time.sleep(10)
    except exceptions.MaxRetriesExceededError:
        logger.error('whoa')
    except (exceptions.WorkerShutdown, exceptions.WorkerTerminate) as exc:
        logger.error(u'retrying, ' + unicode(exc))
        raise self.retry(exc=exc, countdown=10)
    except (KeyboardInterrupt, SystemExit) as exc:
        print 'retrying'
        raise self.retry(exc=exc, countdown=10)
    else:
        return x
    finally:
        logger.info('task ended!')

当我开始在工头中运行这个 celery 任务并按 Ctrl+C 时,会发生以下情况:

^CSIGINT received
22:20:59 system   | sending SIGTERM to all processes
22:20:59 web.1    | exited with code 0
22:21:04 system   | sending SIGKILL to all processes
Killed: 9

所以很明显,没有一个 celery 异常,也没有我在其他帖子中看到的KeyboardInterruptorSystemExit异常,正确地捕获了 SIGTERM 并关闭了工作程序。

这样做的正确方法是什么?

4

3 回答 3

2

从版本 >= 4 开始,Celery 带有一个特殊功能,仅适用于 Heroku,它支持开箱即用的此功能:

$ REMAP_SIGTERM=SIGQUIT celery -A proj worker -l info

来源:https ://devcenter.heroku.com/articles/celery-heroku#using-remap_sigterm

于 2019-08-08T14:45:50.400 回答
1

不幸的是,celery 的设计目的不是清洁关机。曾经。我是认真的。celery 工作人员响应 SIGTERM 但如果任务不完整,工作进程将等待完成任务,然后才退出。在这种情况下,如果工人没有在合理的时间内关闭,您可以向它发送 SIGKILL,但在这种情况下会丢失信息,即您可能不知道哪些作业仍未完成。

于 2015-04-27T04:44:49.170 回答
0

您可以使用acks_latetask_acks_late

任务将在任务完成执行而不是之前从队列中确认。因此,如果工作人员正常关闭,任务将重新生成。

于 2017-12-13T07:23:02.657 回答