我目前正在使用 django 和芹菜,一切正常。
但是,如果服务器超载,我希望能够通过检查当前计划了多少任务,让用户有机会取消任务。
我怎样才能做到这一点?
我正在使用 redis 作为代理。
我刚刚发现: Retrieve list of tasks in a queue in Celery
它在某种程度上与我的问题有关,但我不需要列出任务,只需计算它们:)
我目前正在使用 django 和芹菜,一切正常。
但是,如果服务器超载,我希望能够通过检查当前计划了多少任务,让用户有机会取消任务。
我怎样才能做到这一点?
我正在使用 redis 作为代理。
我刚刚发现: Retrieve list of tasks in a queue in Celery
它在某种程度上与我的问题有关,但我不需要列出任务,只需计算它们:)
这是使用与代理无关的 celery 获取队列中消息数量的方法。
通过使用connection_or_acquire
,您可以利用 celery 的内部连接池来最小化与您的代理的打开连接数。
celery = Celery(app)
with celery.connection_or_acquire() as conn:
conn.default_channel.queue_declare(
queue='my-queue', passive=True).message_count
您还可以扩展 Celery 以提供此功能:
from celery import Celery as _Celery
class Celery(_Celery)
def get_message_count(self, queue):
'''
Raises: amqp.exceptions.NotFound: if queue does not exist
'''
with self.connection_or_acquire() as conn:
return conn.default_channel.queue_declare(
queue=queue, passive=True).message_count
celery = Celery(app)
num_messages = celery.get_message_count('my-queue')
如果你的 broker 配置为redis://localhost:6379/1
,并且你的任务提交到通用celery
队列,那么你可以通过以下方式获取长度:
import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)
或者,从 shell 脚本(适用于监视器等):
$ redis-cli -n 1 -h localhost -p 6379 llen celery
如果你已经在你的应用中配置了 redis,你可以试试这个:
from celery import Celery
QUEUE_NAME = 'celery'
celery = Celery(app)
client = celery.connection().channel().client
length = client.llen(QUEUE_NAME)
获取 Celery 使用的 redis 客户端实例,然后检查队列长度。每次使用时不要忘记释放连接(使用.acquire
):
# Get a configured instance of celery:
from project.celery import app as celery_app
def get_celery_queue_len(queue_name):
with celery_app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)
始终从池中获取连接,不要手动创建它。否则,您的 redis 服务器将用完连接槽,这将杀死您的其他客户端。
我将围绕未找到错误扩展@StephenFuhry 的答案,因为即使 Celery 建议直接与经纪人混淆,检索队列长度的或多或少与经纪人无关的方式也是有益的。在 Celery 4(使用 Redis 代理)中,此错误如下所示:
ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'
观察:
ChannelError
是一个kombu
例外(如果事实上,它是amqp
's 并kombu
“重新导出”它)。
在 Redis 代理 Celery/Kombu 上,将队列表示为 Redis 列表
每当集合为空时, Redis 集合类型键就会被删除
如果我们看一下queue_declare
,它有以下几行:
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(...)
Kombu Redis 虚拟传输_has_queue
是这样的:
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
结论是,在从 RedisChannelError
提出的代理上queue_declare
是可以的(当然对于现有队列),只是意味着队列是空的。
这是一个如何输出所有活动 Celery 队列长度的示例(通常应该为 0,除非您的工作人员无法处理这些任务)。
from kombu.exceptions import ChannelError
def get_queue_length(name):
with celery_app.connection_or_acquire() as conn:
try:
ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
except ChannelError:
return 0
else:
return ok_nt.message_count
for queue_info in celery_app.control.inspect().active_queues().values():
print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))