60

我正在使用Celery来管理异步任务。然而,偶尔,celery 进程会停止,这会导致没有任何任务被执行。我希望能够检查 celery 的状态并确保一切正常,如果我检测到任何问题,则会向用户显示错误消息。从 Celery Worker 文档看来,我可以使用pinginspect来解决这个问题,但是 ping 感觉很笨拙,并且不清楚 inspect 到底是如何使用的(如果 inspect().registered() 为空?)。

对此的任何指导将不胜感激。基本上我正在寻找的是这样的方法:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

编辑:它甚至不像 celery 2.3.3 上可用的 registered() (即使 2.1 文档列出了它)。也许 ping 是正确的答案。

编辑:Ping 似乎也没有像我想象的那样做,所以仍然不确定这里的答案。

4

9 回答 9

64

这是我一直在使用的代码。 celery.task.control.Inspect.stats()返回一个包含有关当前可用工作人员的大量详细信息的 dict,如果没有工作人员正在运行,则返回 None,IOError如果无法连接到消息代理,则引发 an。我正在使用 RabbitMQ - 其他消息传递系统的行为可能略有不同。这在 Celery 2.3.x 和 2.4.x 中有效;我不确定它可以追溯到多远。

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d
于 2011-12-15T15:28:29.343 回答
13

celery 4.2 的文档中

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    availability = i.ping()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'availability': availability,
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

当然,您可以/应该通过错误处理来改进代码...

于 2018-12-19T17:03:34.717 回答
10

要在 celery 作为守护进程运行的情况下使用命令行进行检查,

  • 激活 virtualenv 并转到“应用程序”所在的目录
  • 现在运行:celery -A [app_name] status
  • 它将显示 celery 是否已上涨或未上涨。在线节点数

资料来源: http: //michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

于 2016-05-25T07:04:14.250 回答
5

以下对我有用:

import socket
from kombu import Connection

celery_broker_url = "amqp://localhost"

try:
    conn = Connection(celery_broker_url)
    conn.ensure_connection(max_retries=3)
except socket.error:
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
于 2016-03-15T01:26:22.687 回答
3

测试是否有任何工作人员响应的一种方法是发送“ping”广播并在第一个响应时返回成功结果。

from .celery import app  # the celery 'app' created in your project

def is_celery_working():
    result = app.control.broadcast('ping', reply=True, limit=1)
    return bool(result)  # True if at least one result

这会广播“ping”并等待最多一秒钟的响应。一旦第一个响应进来,它就会返回一个结果。如果您想要False更快的结果,您可以添加一个timeout参数来减少它在放弃之前等待的时间。

于 2019-03-25T18:08:09.420 回答
1

我找到了一个优雅的解决方案:

from .celery import app
try:
    app.broker_connection().ensure_connection(max_retries=3)
except Exception as ex:
    raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex)))
于 2021-08-07T20:55:03.723 回答
0

您可以使用ping方法检查任何工人(或特定工人)是否还活着
https://docs.celeryproject.org/en/latest/_modules/celery/app/control.html#Control.ping

celey_app.control.ping()

于 2019-10-01T19:17:15.447 回答
0

下面的脚本对我有用。

    #Import the celery app from project
    from application_package import app as celery_app
    def get_celery_worker_status():
        insp = celery_app.control.inspect()
        nodes = insp.stats()
        if not nodes:
            raise Exception("celery is not running.")
        logger.error("celery workers are: {}".format(nodes))
        return nodes
于 2020-10-01T14:45:34.707 回答
0

您可以通过运行以下命令在终端上进行测试。

celery -A proj_name worker -l INFO

您可以在每次运行 celery 时进行回顾。

于 2022-02-28T14:24:51.847 回答