13

也许我问这个问题很愚蠢,但在我做进一步的工作之前,我需要先了解基本概念。

我正在处理几千个 RSS 提要,使用多个 Celery 工作节点和一个 RabbitMQ 节点作为代理。每个提要的 URL 都作为消息写入队列中。工作人员只是从队列中读取 URL 并开始处理它。我必须确保两个工作人员不会同时处理单个 RSS 提要。

确保一次只执行一个任务一文提出了一种基于 Memcahced 的解决方案,用于在处理提要时锁定提要。

但我想了解的是,为什么我需要使用 Memcached(或其他东西)来确保 RabbitMQ 队列上的消息不会被多个工作人员同时使用。我可以在 RabbitMQ(或 Celery)中进行一些配置更改以实现此目标吗?

4

3 回答 3

5

在正常工作设置中,多个消费者肯定不会看到单个 MQ 消息。您必须为涉及失败/崩溃工人的案例做一些工作,阅读自动确认和消息拒绝,但基本案例是合理的。

我在您链接的文章中没有看到同步队列(阅读:MQ),因此(据我所知)他们使用锁定机制(阅读:memcache)进行同步,作为替代方案。我可以想到一些在适当的 MQ 设置中不会出现的问题。

于 2012-08-28T09:29:30.063 回答
4

正如其他人所指出的,您正在混合苹果和橙子。

作为 celery 任务和 MQ 消息。

您可以确保一条消息将仅由一个工作人员同时处理。

例如。

@task(...)
def my_task(

my_task.apply(1)

.apply 将消息发布到您正在使用的消息代理(rabbit,redis ...)。然后消息将被路由到一个队列并由一个工作人员使用。您不需要为此锁定,您可以免费获得它:)

celery cookbook 上的示例显示了如何防止两条类似的消息 (my_task.apply(1)) 同时运行,这是您需要在任务本身中确保的内容。

当然,您需要可以从所有工作人员(memcached,redis ...)访问的东西,因为它们可能在不同的机器上运行。

于 2012-08-28T10:07:04.713 回答
2

提到的示例通常用于其他目标:它​​阻止您使用具有相同含义的不同消息(不是相同的消息)。例如,我有两个进程:第一个进程将一些 URL 放入队列,第二个进程 - 从队列中获取 URL 并获取它们。如果第一个进程将一个 URL 排队两次(甚至更多次)会怎样?

PS我用于此目的的Redis存储和setnx操作(只能设置一次密钥)。

于 2012-08-28T06:10:51.127 回答