9

我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行。它通过 RabbitMQ 消息代理以 JSON 格式发送订阅。

我尝试了几种策略,到目前为止最好的是以下策略,但仍未完全奏效:

每台集群机器运行一个消费者模块,它自己订阅 AMQP 队列并发出一个prefetch_count来告诉代理它一次可以运行多少个任务。

我能够使用 Pika AMQP 库中的 SelectConnection 使其工作。消费者和生产者都启动两个通道,一个连接到每个队列。生产者在通道 [A] 上发送请求并在通道 [B] 中等待响应,消费者在通道 [A] 上等待请求并在通道 [B] 上发送响应。然而,似乎当消费者运行计算响应的回调时,它会阻塞,所以我每次只在每个消费者处执行一个任务。

我到底需要什么:

  1. 消费者 [A] 将他的任务(每次大约 5k)订阅到集群
  2. 代理为每个消费者分派 N 条消息/请求,其中 N 是它可以处理的并发任务数
  3. 当单个任务完成时,消费者用结果回复代理/生产者
  4. 生产者收到回复,更新计算状态,最后打印一些报告

限制:

  • 如果另一个用户提交工作,他的所有任务都将排在前一个用户之后(我想这从队列系统中自动出现,但我没有考虑对线程环境的影响)
  • 任务有提交顺序,但回复顺序不重要

更新

我进行了更深入的研究,我的实际问题似乎是我使用一个简单的函数作为 pika 的 SelectConnection.channel.basic_consume() 函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是常规函数,这样回调就不会阻塞,消费者可以继续收听。

4

3 回答 3

4

正如您所注意到的,您的进程在运行回调时会阻塞。有几种方法可以解决这个问题,具体取决于你的回调做什么。

如果您的回调是 IO 绑定的(进行大量网络或磁盘 IO),您可以使用线程或基于 greenlet 的解决方案,例如geventeventlet温室。但请记住,Python 受到 GIL(全局解释器锁)的限制,这意味着只有一段 Python 代码在单个 Python 进程中运行。这意味着如果您使用 python 代码进行大量计算,这些解决方案可能不会比您已有的解决方案快多少。

另一种选择是使用multiprocessing将您的消费者实现为多个进程。我发现多处理在进行并行工作时非常有用。您可以通过使用Queue来实现这一点,让父进程作为消费者并将工作分担给它的子进程,或者通过简单地启动多个进程来实现,每个进程都自己消费。我建议,除非您的应用程序是高度并发的(1000 个工作人员),否则只需启动多个工作人员,每个工作人员都从自己的连接中消耗。这样,您可以使用 AMQP 的确认功能,因此如果消费者在处理任务时死亡,消息会自动发送回队列并由另一个工作人员接收,而不是简单地丢失请求。

如果您控制生产者并且它也是用 Python 编写的,最后一个选择是使用像celery这样的任务库来为您抽象任务/队列工作。我在几个大型项目中使用过 celery,并且发现它写得很好。它还将通过适当的配置为您处理多个消费者问题。

于 2013-04-08T18:42:28.373 回答
0

由于缺乏线程经验,我的设置将运行多个消费者进程(其数量基本上是您的预取计数)。每个人都会连接到两个队列,他们会愉快地处理工作,不知道彼此的存在。

于 2013-03-06T11:00:51.870 回答
0

你的设置对我来说听起来不错。你是对的,你可以简单地设置回调来启动一个线程,并在线程完成时将其链接到一个单独的回调,以便通过通道 B 将响应排队返回。

基本上,您的消费者应该有自己的队列(N 的大小,他们支持的并行度)。当一个请求通过通道 A 进来时,它应该将结果存储在与 Pika 的主线程和线程池中的工作线程共享的队列中。一旦它进入队列,pika 应该以 ACK 响应,并且您的工作线程将唤醒并开始处理。

一旦工作人员完成其工作,它会将结果排队返回到单独的结果队列中,并向主线程发出回调以将其发送回消费者。

您应该注意并确保工作线程在使用任何共享资源时不会相互干扰,但这是一个单独的主题。

于 2012-04-08T14:20:49.900 回答