15

我在一个基于多租户云的应用程序的 Web 应用程序上工作(许多客户端,每个客户端都有自己独立的“环境”,但都在共享的硬件集上),我们正在引入用户批量处理的能力为以后的处理工作。批处理工作的类型真的不重要,只是数量足够,没有工作队列就不太实际。我们选择 RabbitMQ 作为我们的底层队列框架。

因为我们是一个多租户应用程序,我们不一定希望客户端能够为另一个客户端造成冗长的队列处理时间,所以我们提出的一个想法是在每个客户端的基础上创建一个队列并拥有指向我们所有客户端队列的共享工作池。问题是,据我所知,工作人员直接绑定到特定队列,而不是交换。在我们的理想世界中,我们的客户端队列仍将在没有一个客户端阻塞另一个客户端的情况下从共享工作池中处理,我们可以通过启动更多工作人员或关闭空闲工作人员来根据需要扩大或缩小该工作池。将工作人员绑定到特定队列实际上可以防止我们这样做,因为我们经常有很多工作人员只是在没有活动的队列上闲置。

有没有相对直接的方法来实现这一点?我对 RabbitMQ 还很陌生,还没有真正能够完成我们所追求的目标。我们也不想编写一个非常复杂的多线程消费者应用程序,这是我们可能负担不起的开发和测试时间。我们的堆栈是基于 Windows/.Net/C# 的,如果这很重要的话,但我认为这不会对手头的问题产生重大影响。

4

4 回答 4

5

您可以查看优先级队列实现(最初提出此问题时未实现):https ://www.rabbitmq.com/priority.html

如果这对您不起作用,您可以尝试其他一些技巧来实现您想要的(应该适用于旧版本的 RabbitMQ):

您可以将 100 个队列绑定到主题交换并将路由键设置为用户 ID % 100 的散列,即每个任务将具有 1 到 100 之间的键,并且同一用户的任务将具有相同的键。每个队列都绑定了 1 到 100 之间的唯一模式。现在您有一组工作人员,它们以随机队列编号开始,然后在每个作业之后增加该队列编号,再次 %100 以在队列 100 之后循环回到队列 1。

现在,您的工作人员队列可以并行处理多达 100 个唯一用户,或者如果没有其他工作要做,所有工作人员都可以专注于单个用户。如果工作人员需要在每个作业之间循环遍历所有 100 个队列,那么在只有一个用户在单个队列上有很多作业的情况下,每个作业之间自然会有一些开销。较少数量的队列是解决此问题的一种方法。您还可以让每个工作人员保持与每个队列的连接,并从每个队列中消费最多一条未确认的消息。然后,如果未确认消息超时设置得足够高,则工作人员可以更快地循环遍历内存中的未决消息。

或者,您可以创建两个交换器,每个交换器都有一个绑定队列。所有工作都进入第一个交换和队列,由一组工作人员使用。如果一个工作单元花费的时间太长,工人可以取消它并将其推送到第二个队列。当第一个队列上没有任何内容时,工作人员只会处理第二个队列。您可能还需要几个具有相反队列优先级的工作人员,以确保在有永无止境的短任务流到达时仍然处理长时间运行的任务,以便最终始终处理用户批次。这不会真正将您的工作人员队列分配给所有任务,但它会阻止一个用户长时间运行的任务阻止您的工作人员为同一用户或另一个用户执行短期运行的任务。它还假设您可以取消作业并稍后重新运行它而不会出现任何问题。这也意味着超时并需要以低优先级重新运行的任务会浪费资源。除非你能提前识别快慢任务

如果单个用户有 100 个慢速任务,然后另一个用户发布了一批任务,那么第一个带有 100 个队列的建议也可能有问题。在完成其中一项慢速任务之前,不会查看这些任务。如果这被证明是一个合法的问题,您可以将这两种解决方案结合起来。

于 2016-09-27T22:45:49.150 回答
1

我不明白您为什么不使用 RabbitMQ 的虚拟主机,而是让您的应用程序登录到 RabbitMQ 并在每个用户的单独连接上进行身份验证。

这并不意味着您不能有一个工人主管将工人分配给一个用户或另一个用户。但这确实意味着每个用户的所有消息都由完全独立的交换和队列处理。

于 2012-01-29T06:34:35.617 回答
1

您可以让您的工作人员池都使用相同的唯一队列。然后工作将在他们之间分配,您将能够增加/缩小您的池以增加/减少您的工作处理能力。

于 2011-11-28T20:36:46.690 回答
0

工人被分配了 0+ 个队列,而不是交换。

哪些任务将从每个工作人员的哪些队列中获取的逻辑在通过 指示的类中实现,CELERYD_CONSUMER默认情况下是celery.worker.consumer.Consumer

您可以创建一个自定义消费者类 ro 实现您喜欢的任何逻辑。困难的部分是决定你想使用的“公平”算法的细节;但是一旦你决定了,你就可以通过创建一个自定义的消费者类并将其分配给适当的工作人员来实现它。

于 2013-08-29T18:57:46.713 回答