27

在我的网站中,用户可以随时更新他们的个人资料(手动),或者每天自动更新一次。

这个任务现在与 celery 一起分发。

但我有一个“问题”:

每天,在自动更新中,一项作业将所有用户(+-6k 用户)放入队列:

from celery import group
from tasks import *
import datetime
from lastActivityDate.models import UserActivity

today   = datetime.datetime.today()
one_day = datetime.timedelta(days=5)
today -= one_day

print datetime.datetime.today()

user_list = UserActivity.objects.filter(last_activity_date__gte=today)
g = group(update_user_profile.s(i.user.auth.username) for i in user_list)

print datetime.datetime.today()
print g(user_list.count()).get()

如果有人尝试手动更新,他们将进入队列并永远执行。

有没有办法将此手动任务设置为以优先方式运行?或者为每个单独的队列制作一个专用的:手动和自动?

4

2 回答 2

39

Celery 不支持任务优先级。(v3.0)

http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities

您可以通过路由任务来解决此问题。

http://docs.celeryproject.org/en/latest/userguide/routing.html

准备 default 和 priority_high 队列。

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
    Queue('priority_high'),
)

运行两个守护进程。

user@x:/$ celery worker -Q priority_high
user@y:/$ celery worker -Q default,priority_high

和路线任务。

your_task.apply_async(args=['...'], queue='priority_high')
于 2013-04-05T06:16:07.673 回答
6

如果您使用RabbitMQ传输,请按以下方式配置队列: settings.py

from kombu import Queue
...
CELERY_TASK_QUEUES = (
    Queue('default', routing_key='task_default.#', max_priority=10), 
    ...)

然后运行你的任务:

my_low_prio_task.apply_async(args=(...), priority=1)
my_high_prio_task.apply_async(args=(...), priority=10)

目前此代码适用于kombu==4.6.11,celery==4.4.6。

于 2021-08-12T08:08:25.913 回答