我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行。它通过 RabbitMQ 消息代理以 JSON 格式发送订阅。
我尝试了几种策略,到目前为止最好的是以下策略,但仍未完全奏效:
每台集群机器运行一个消费者模块,它自己订阅 AMQP 队列并发出一个prefetch_count来告诉代理它一次可以运行多少个任务。
我能够使用 Pika AMQP 库中的 SelectConnection 使其工作。消费者和生产者都启动两个通道,一个连接到每个队列。生产者在通道 [A] 上发送请求并在通道 [B] 中等待响应,消费者在通道 [A] 上等待请求并在通道 [B] 上发送响应。然而,似乎当消费者运行计算响应的回调时,它会阻塞,所以我每次只在每个消费者处执行一个任务。
我到底需要什么:
- 消费者 [A] 将他的任务(每次大约 5k)订阅到集群
- 代理为每个消费者分派 N 条消息/请求,其中 N 是它可以处理的并发任务数
- 当单个任务完成时,消费者用结果回复代理/生产者
- 生产者收到回复,更新计算状态,最后打印一些报告
限制:
- 如果另一个用户提交工作,他的所有任务都将排在前一个用户之后(我想这从队列系统中自动出现,但我没有考虑对线程环境的影响)
- 任务有提交顺序,但回复顺序不重要
更新
我进行了更深入的研究,我的实际问题似乎是我使用一个简单的函数作为 pika 的 SelectConnection.channel.basic_consume() 函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是常规函数,这样回调就不会阻塞,消费者可以继续收听。