12

对于基于 Python/Django/Celery 的部署工具,我们有以下设置:

  1. 我们目前使用默认的 Celery 设置。(一个队列+交换称为“芹菜”。)
  2. 队列中的每个任务代表一个部署操作。
  3. 环境的每个任务都以可能需要(非常)长时间的同步阶段结束。

需要满足以下规范:

  1. 并发性:多个环境的任务应该同时执行。
  2. 锁定:每个环境最多可能同时运行一个任务(即环境锁定)。
  3. 吞吐量优化:当单个环境有多个任务时,可以合并它们的同步阶段进行优化。因此,如果一个任务即将结束,它应该检查队列中是否有新任务在等待这个环境,如果有,则跳过其同步阶段。

实现这一点的首选方法是什么?

一些想法:

  • 我想说我们必须设置多个队列:每个环境一个,并且让N celery worker 专门处理一个队列,每个。(这将解决规范 1+2。)
    但是我们如何让多个 celery worker 专门监听不同的队列呢?
  • 是否有一种干净的方法可以知道队列中有更多任务在等待环境?
4

2 回答 2

2

对于 1,2,使用多个队列并使用 -Q 启动工作人员来指定要监听的队列。还配置 CELERYD_PREFETCH_MULTIPLIER = 1,一次仅用于一项任务。

要获得队列长度(使用 rabbitmq 测试),您可以使用以下内容:

from kombu.connection import BrokerConnection
connection = BrokerConnection(BROKER_HOST, BROKER_USER...)
channel = connection.channel()
q, j, c = channel.queue_declare('celery', passive=True)
print 'celery %d jobs in queue' % j

'queue_delcare' 作为副作用,给你队列的长度。希望这可以帮到你。

于 2011-04-07T10:13:29.990 回答
1

我会看看zeromq它可以在一个超快速库中进行消息传递和多线程。它还支持大量语言并内置负载平衡。

于 2011-04-05T18:43:33.767 回答