22

我目前正在使用 django 和芹菜,一切正常。

但是,如果服务器超载,我希望能够通过检查当前计划了多少任务,让用户有机会取消任务。

我怎样才能做到这一点?

我正在使用 redis 作为代理。

我刚刚发现: Retrieve list of tasks in a queue in Celery

它在某种程度上与我的问题有关,但我不需要列出任务,只需计算它们:)

4

5 回答 5

31

这是使用与代理无关的 celery 获取队列中消息数量的方法。

通过使用connection_or_acquire,您可以利用 celery 的内部连接池来最小化与您的代理的打开连接数。

celery = Celery(app)

with celery.connection_or_acquire() as conn:
    conn.default_channel.queue_declare(
        queue='my-queue', passive=True).message_count

您还可以扩展 Celery 以提供此功能:

from celery import Celery as _Celery


class Celery(_Celery)

    def get_message_count(self, queue):
        '''
        Raises: amqp.exceptions.NotFound: if queue does not exist
        '''
        with self.connection_or_acquire() as conn:
            return conn.default_channel.queue_declare(
                queue=queue, passive=True).message_count


celery = Celery(app)
num_messages = celery.get_message_count('my-queue')
于 2015-10-19T01:03:27.897 回答
25

如果你的 broker 配置为redis://localhost:6379/1,并且你的任务提交到通用celery队列,那么你可以通过以下方式获取长度:

import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)

或者,从 shell 脚本(适用于监视器等):

$ redis-cli -n 1 -h localhost -p 6379 llen celery
于 2013-10-02T04:09:26.240 回答
6

如果你已经在你的应用中配置了 redis,你可以试试这个:

from celery import Celery

QUEUE_NAME = 'celery'

celery = Celery(app)
client = celery.connection().channel().client

length = client.llen(QUEUE_NAME)
于 2015-09-11T07:36:01.953 回答
4

获取 Celery 使用的 redis 客户端实例,然后检查队列长度。每次使用时不要忘记释放连接(使用.acquire):

# Get a configured instance of celery:
from project.celery import app as celery_app

def get_celery_queue_len(queue_name):
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)

始终从池中获取连接,不要手动创建它。否则,您的 redis 服务器将用完连接槽,这将杀死您的其他客户端。

于 2018-05-04T21:18:53.760 回答
1

我将围绕未找到错误扩展@StephenFuhry 的答案,因为即使 Celery 建议直接与经纪人混淆,检索队列长度的或多或少与经纪人无关的方式也是有益的。在 Celery 4(使用 Redis 代理)中,此错误如下所示:

ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'

观察:

  1. ChannelError是一个kombu例外(如果事实上,它是amqp's 并kombu“重新导出”它)。

  2. 在 Redis 代理 Celery/Kombu 上,将队列表示为 Redis 列表

  3. 每当集合为空时, Redis 集合类型键就会被删除

  4. 如果我们看一下queue_declare,它有以下几行

    if passive and not self._has_queue(queue, **kwargs):
        raise ChannelError(...)
    
  5. Kombu Redis 虚拟传输_has_queue这样的:

    def _has_queue(self, queue, **kwargs):
        with self.conn_or_acquire() as client:
            with client.pipeline() as pipe:
                for pri in self.priority_steps:
                    pipe = pipe.exists(self._q_for_pri(queue, pri))
                return any(pipe.execute())
    

结论是,在从 RedisChannelError提出的代理上queue_declare是可以的(当然对于现有队列),只是意味着队列是空的。

这是一个如何输出所有活动 Celery 队列长度的示例(通常应该为 0,除非您的工作人员无法处理这些任务)。

from kombu.exceptions import ChannelError

def get_queue_length(name):
    with celery_app.connection_or_acquire() as conn: 
        try:
            ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
        except ChannelError:
            return 0
        else:
            return ok_nt.message_count
        
for queue_info in celery_app.control.inspect().active_queues().values():
    print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))
于 2020-08-17T18:18:31.583 回答