5

我已经开始使用 RQ / Redis 为我的 django 站点构建一些长时间运行的作业的异步执行。我希望做以下事情:

  • 我想要一个模型的每个实例都有一个队列。你可以把这个模型想象成一个 api 用户帐户。(不会很多。最多 15 - 20 个)

  • 我将在队列中平均分配一批任务(从 10 到 500 个)。在第一批完成之前,可以添加多个批次。

  • 对于每个批次,我想为每个没有积极工作的队列启动一个工作人员,并且我想以批处理模式运行这些工作人员,这样一旦他们用完任务,他们就会关闭。

  • 我意识到我不能以批处理模式运行它们,然后我将一直在工作/监听所有队列的工作。这样做的问题是我希望能够动态地添加和删除队列,因此最好在每个批次中启动可用队列。

我意识到我在队列中分配任务似乎很奇怪,但原因是同一队列中的每个任务都必须根据我正在使用的服务进行速率限制/节流(将其视为 API速率限制,但每个队列代表一个不同的帐户)。但就我的目的而言,任务在哪个账户上运行没有区别,所以我不妨在所有账户上并行化。

我面临的问题是,如果我启动一个工作人员并给它一个已经在处理的队列,我现在有两个工作人员在该队列上独立运行,因此我的预期节流率降低了一半。如果该队列上还没有工作人员,我如何才能启动一个工作人员?我可能会找到一个 hacky 解决方案,但我更愿意以“正确”的方式处理它,因为我对队列没有太多经验,所以我想我应该问一下。

我已经在实现我自己的工人类,以便我可以动态控制队列,所以我只需要一种方法来添加逻辑,如果该队列已经在处理,它将不会被赋予新的工人。我的工人的一个简单版本在这里:

# custom_worker.py
import sys
from Api.models import *
from rq import Queue, Connection, Worker

# importing the necessary namespace for the tasks to run
from tasks import *

# dynamically getting the queue names in which I am expecting tasks
queues = [user.name for user in ApiUser.objects.all()]

with Connection():

    qs = list(map(Queue, queues)) or [Queue()]

    w = Worker(qs)

    w.work(burst=True)
4

1 回答 1

3

找到解决方案只是意味着深入研究 python-rq 的源代码。我可能会考虑改进文档。无论如何,这似乎满足我的需求!

import sys
from Api.models import *
from rq import Queue, Connection, Worker

# importing the necessary namespace for the tasks to run
from tasks import *

# Provide queue names to listen to as arguments to this script,
with Connection():

    current_workers = Worker.all()
    working_queues = [queue.name for worker in current_workers for queue in worker.queues]
    proposed_queues = [user.name for user in ApiUser.objects.all()]
    queues_to_start = [queue for queue in proposed_queues if not queue in working_queues]

    if len(queues_to_start) > 0:
        qs = list(map(Queue, queues_to_start))
        w = Worker(qs)
        w.work(burst=True)
    else:
        print("Nothing to do here.")
于 2014-10-12T21:13:13.057 回答