我有一个具有以下配置的 RabbitMQ 设置。
- 每个交易所都是
FANOUT
类型 - 多个队列附加到每个 Exchange。
BlockingConnection
由消费者制造。- 单个消费者处理所有回调。
问题 -
一些有效负载比其他有效负载需要更长的时间来处理,这导致消费者即使在其他队列中有有效负载时也保持空闲状态。
问题 -
- 我应该如何实施消费者以避免长时间等待?我应该为每个模块运行单独的消费者吗?任何用户体验?
- 我可以配置 RabbitMQ 来处理这些情况吗?如果是这样怎么办。?
我有一个具有以下配置的 RabbitMQ 设置。
FANOUT
类型BlockingConnection
由消费者制造。问题 -
一些有效负载比其他有效负载需要更长的时间来处理,这导致消费者即使在其他队列中有有效负载时也保持空闲状态。
问题 -
首先,您使用的是什么编程语言?大多数常用语言,如 python、java、c#,都支持为并行进程创建额外的线程。
假设您使用下面的队列(伪代码):
def callback(ch, method, properties, body) ...
def threaded_function(ch, method, properties, body) ...
channel.basic_qos(prefetch_count=3)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
首先,设置“prefetch_count=3”允许您的消费者同时拥有最多 3 条处于未确认状态的消息。
在回调方法中,您应该使用 threaded_function 启动一个线程来执行每条消息。在 threaded_function 方法体的末尾,执行以下操作:
ch.basic_ack(delivery_tag = method.delivery_tag)
这样,最多可以同时处理3条消息,即使其中一个或两个线程需要更长的时间运行,其他线程仍然可以处理下一条消息。
首先,很高兴知道为什么您有多个扇出交换?你真的需要这个吗?扇出交换将消息发送到所有队列...