13

我想用芹菜实现一个分布式作业执行系统。鉴于 rabbitMQ 不支持优先级并且我非常需要这个功能,所以我转向了 celery+redis。

在我的情况下,任务与硬件密切相关,例如,任务 A 只能在 Worker 1 上运行,因为只有 Worker 1 的 PC 拥有必要的硬件。我将每个工作人员的 CONCURRENCY 设置为 1,以便工作人员每次只运行一个任务。每个任务大约需要 2 分钟。

为了实现优先级功能,首先我尝试priority在调用时添加参数apply_async(),例如apply_async(priority=0)and apply_async(priority=9)。在这个测试中,我只启动了一个 COCURRENCY=1 的 Worker,并以不同的优先级一个一个地启动了 10 个任务。我希望看到启动的任务apply_async(priority=0)将优先运行,但不幸的是它们只是作为启动顺序启动。

然后我尝试做一些工作。我克隆了每个任务,所以对于每个任务我都有 task_high 和 task_low,由@celery.task(priority=0)and装饰@celery.task(priority=1)。然后我做了和上面一样的测试,这次更好,当启动顺序是“HH-LLLL-HHHH”时,真正的顺序是“HH-LHHLHLLH”。我想redis在这里做了一些调度和平衡工作。

但这仍然不能满足我的期望。我希望得到像“HHHHHH-LLLL”这样的订单,因为对于某些任务,我只有一台合适的机器和必要的硬件,并希望高优先级的任务尽快运行。

我在 Internet 上搜索了其他工作,例如使用两个队列,一个用于高优先级任务,另一个用于低优先级任务,前者使用 2 台机器,后者使用 1 台机器。但由于我的硬件非常有限,这对我不起作用。

你能提出一些建议吗?

4

2 回答 2

21

Celery Redis 传输确实尊重优先级字段,但 Redis 本身没有优先级的概念。

通过为每个队列创建 n 个列表并在 BRPOP 命令中使用该顺序来实现优先级支持。我n之所以在这里说是因为即使有 10 (0-9) 个优先级,默认情况下这些都合并为 4 个级别以节省资源。这意味着一个名为的队列celery实际上会被拆分为 4 个队列:

['celery0', 'celery3`, `celery6`, `celery9`]

如果您想要更多优先级,您可以设置priority_steps传输选项:

BROKER_TRANSPORT_OPTIONS = {
    'priority_steps': list(range(10)),
}

也就是说,请注意,这永远不会像在服务器级别实现的优先级那样好,并且最多可能是近似的。但它对于您的应用程序可能仍然足够好。

于 2013-03-06T11:55:45.550 回答
2

Celery 关于 redis 消息优先级的文档在这里redis-message-priorities,您可以自定义优先级。以 10 为例:

  1. 设置 priority_steps 传输选项
app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'queue_order_strategy': 'priority',
}
  1. 以正常方式启动芹菜工人
celery -A tasks worker --loglevel=info
  1. 调用任务,0 为最高优先级,9 为最低优先级
custom_priority=5 
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},priority=custom_priority)
于 2020-12-14T09:52:49.057 回答