30

我正在使用 Celery 执行异步后台任务,使用 Redis 作为后端。我对芹菜工人在以下情况下的行为感兴趣:

我正在使用celeryd. 该工作人员已通过-Q选项分配了两个队列以供使用:

celeryd -E -Q queue1,queue2

工人如何决定从哪里获取下一个要消费的任务?它会随机消耗来自queue1或的任务queue2吗?它是否会优先获取 from ,queue1因为它在传递给的参数列表中是第一个-Q

4

3 回答 3

16

根据我的测试,它处理多个队列round-robin 样式

如果我使用这个测试代码:

from celery import task
import time


@task
def my_task(item_id):
    time.sleep(0.5)
    print('Processing item "%s"...' % item_id)


def add_items_to_queue(queue_name, items_count):
    for i in xrange(0, items_count):
        my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)


add_items_to_queue('queue1', 10)
add_items_to_queue('queue2', 10)
add_items_to_queue('queue3', 5)

并以(使用 django-celery)开始队列:

`manage.py celery worker -Q queue1,queue2,queue3`

它输出:

Processing item "queue1-0"...
Processing item "queue3-0"...
Processing item "queue2-0"...
Processing item "queue1-1"...
Processing item "queue3-1"...
Processing item "queue2-1"...
Processing item "queue1-2"...
Processing item "queue3-2"...
Processing item "queue2-2"...
Processing item "queue1-3"...
Processing item "queue3-3"...
Processing item "queue2-3"...
Processing item "queue1-4"...
Processing item "queue3-4"...
Processing item "queue2-4"...
Processing item "queue1-5"...
Processing item "queue2-5"...
Processing item "queue1-6"...
Processing item "queue2-6"...
Processing item "queue1-7"...
Processing item "queue2-7"...
Processing item "queue1-8"...
Processing item "queue2-8"...
Processing item "queue1-9"...
Processing item "queue2-9"...

因此,即使所有 queue1 任务都在 queue2和 3 任务之前 发布,它也会在继续下一个 queue1 项目之前从每个队列中提取一个项目。

注意:正如@WarLord 指出的那样,这个确切的行为只有在CELERYD_PREFETCH_MULTIPLIER设置为 1 时才有效。如果它大于 1,那么这意味着项目将从队列中批量获取。因此,如果您有 4 个进程的 PREFETCH_MULTIPLIER 设置为 4,这意味着将立即从队列中拉出 16 个项目,因此您不会得到上面的确切输出,但它仍将大致遵循循环.

于 2015-10-08T01:14:22.527 回答
7

注意:此答案已弃用:最新版本的 Celery 与 2013 年的工作方式大不相同......

消耗多个队列的工作人员消耗任务,FIFO 顺序也跨多个队列维护。

例子:

队列1:(t1,t2,t5,t7)
队列2:(t0,t3,t4,t6)

假设0-7代表发布任务的顺序

消费顺序为 t0, t1, t2, t3, t4, t5, t6, t7

于 2013-02-08T08:58:24.637 回答
2

使用指向 rabbitmq 服务器的 pyamqp 代理库,任务以循环方式处理。见下文证明。

似乎处理的订单任务是由代理决定的,而不是实际的后端(rabbitmq vs redis 不是问题)。

软件版本:

$ pip freeze | egrep "celery|kombu|amqp"
amqp==2.5.2
celery==4.4.2
kombu==4.6.8
from time import sleep

@app.task
def sleepy(name):
    print(f"Processing: {name}")
    sleep(0.5)

然后在另一个 shell 中,将任务排队:

from time import sleep

def queue_them():
    for x in range(50):
        sleepy.apply_async(args=(f"Q1-T{x}",), queue="Q1")
    sleep(0.1)
    for x in range(20):
        sleepy.apply_async(args=(f"Q2-T{x}",), queue="Q2")
    sleep(0.1)
    sleepy.apply_async(args=("Q3-T0",), queue="Q3")
    for x in range(30):
        sleepy.apply_async(args=(f"Q2MOAR-T{x}",), queue="Q2")

# setup - get celery to setup the queues and exchanges
sleepy.apply_async(args=("nothing",), queue="Q1")
sleepy.apply_async(args=("nothing",), queue="Q2")
sleepy.apply_async(args=("nothing",), queue="Q3")

# run the test
queue_them()

在另一个 shell 中,运行 celery:

$ celery worker -A myapp.celery --pool=prefork --concurrency=2 -Ofair --queues=Q1,Q3,Q2

[2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T1
[2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T0
[2020-05-05 21:59:12,052] WARNING [celery.redirected:235] Processing: Q1-T2
[2020-05-05 21:59:12,053] WARNING [celery.redirected:235] Processing: Q1-T3
[2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T5
[2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T4
[2020-05-05 21:59:13,062] WARNING [celery.redirected:235] Processing: Q1-T6
[2020-05-05 21:59:13,063] WARNING [celery.redirected:235] Processing: Q1-T7
[2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T9
[2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T8
[2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q1-T10
[2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q3-T0
[2020-05-05 21:59:14,571] WARNING [celery.redirected:235] Processing: Q2-T0
[2020-05-05 21:59:14,572] WARNING [celery.redirected:235] Processing: Q2-T1
[2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q1-T11
[2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q2-T2
[2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q2-T3
[2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q1-T12
[2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q1-T13
[2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q2-T4
[2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q1-T14
[2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q2-T5
[2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q1-T15
[2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q2-T6
[2020-05-05 21:59:17,591] WARNING [celery.redirected:235] Processing: Q1-T16
[2020-05-05 21:59:17,592] WARNING [celery.redirected:235] Processing: Q2-T7
[2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q1-T17
[2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q2-T8
[2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q1-T18
[2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q2-T9
[2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T19
[2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T20
[2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T21
[2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T22
[2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q1-T23
[2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q2-T10
[2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q1-T24
[2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q2-T11
[2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T25
[2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T26
[2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q2-T12
[2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q1-T27
[2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q1-T28
[2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q2-T13
[2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q2-T14
[2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q1-T29
[2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T31
[2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T30
[2020-05-05 21:59:23,631] WARNING [celery.redirected:235] Processing: Q2-T15
[2020-05-05 21:59:23,632] WARNING [celery.redirected:235] Processing: Q1-T32
[2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q1-T33
[2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q2-T16
[2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T17
[2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T18
[2020-05-05 21:59:25,138] WARNING [celery.redirected:235] Processing: Q2-T19
[2020-05-05 21:59:25,139] WARNING [celery.redirected:235] Processing: Q1-T34
[2020-05-05 21:59:25,641] WARNING [celery.redirected:235] Processing: Q1-T35
[2020-05-05 21:59:25,642] WARNING [celery.redirected:235] Processing: Q2MOAR-T0
[2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T36
[2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T37
[2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q2MOAR-T1
[2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q1-T38
[2020-05-05 21:59:27,153] WARNING [celery.redirected:235] Processing: Q2MOAR-T2
[2020-05-05 21:59:27,154] WARNING [celery.redirected:235] Processing: Q1-T39
[2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T3
[2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T4
[2020-05-05 21:59:28,159] WARNING [celery.redirected:235] Processing: Q2MOAR-T5
[2020-05-05 21:59:28,160] WARNING [celery.redirected:235] Processing: Q1-T40
[2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q2MOAR-T6
[2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q1-T41
[2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q2MOAR-T7
[2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q1-T42

当 celery 以 1 的并发运行时,结果类似:

[2020-05-05 22:01:33,879] WARNING [celery.redirected:235] Processing: Q1-T0
[2020-05-05 22:01:34,385] WARNING [celery.redirected:235] Processing: Q1-T1
[2020-05-05 22:01:34,888] WARNING [celery.redirected:235] Processing: Q1-T2
[2020-05-05 22:01:35,391] WARNING [celery.redirected:235] Processing: Q1-T3
[2020-05-05 22:01:35,894] WARNING [celery.redirected:235] Processing: Q1-T4
[2020-05-05 22:01:36,397] WARNING [celery.redirected:235] Processing: Q1-T5
[2020-05-05 22:01:36,899] WARNING [celery.redirected:235] Processing: Q3-T0
[2020-05-05 22:01:37,404] WARNING [celery.redirected:235] Processing: Q2-T0
[2020-05-05 22:01:37,907] WARNING [celery.redirected:235] Processing: Q2-T1
[2020-05-05 22:01:38,411] WARNING [celery.redirected:235] Processing: Q1-T6
[2020-05-05 22:01:38,913] WARNING [celery.redirected:235] Processing: Q2-T2
[2020-05-05 22:01:39,417] WARNING [celery.redirected:235] Processing: Q2-T3
[2020-05-05 22:01:39,919] WARNING [celery.redirected:235] Processing: Q2-T4
[2020-05-05 22:01:40,422] WARNING [celery.redirected:235] Processing: Q1-T7
[2020-05-05 22:01:40,925] WARNING [celery.redirected:235] Processing: Q2-T5
[2020-05-05 22:01:41,429] WARNING [celery.redirected:235] Processing: Q1-T8
于 2020-05-05T12:06:44.403 回答