我正在为 RabbitMQ 构建一个 Ruby 工作者,它将每次处理多条消息,并将在多个队列上工作。在 RabbitMQ 中,我有:Queue1、Queue2、Queue3 ...
我需要快速加载大包数据包(使用 basic_get 我可以加载 65535 条消息而无需询问)。下面的代码运行良好,直到我们转移到新服务器。队列和工作人员之间的延迟上升,我每秒只能加载 50 条基本 get 消息。
这是我的工作流程:
def work_iteration
cache = queue_manager.messages
size = cache.size
return if size == 0
delivery_tag = process(cache)
queue_manager.ack(delivery_tag)
size
end
在 QueueManager.rb 我有
def messages
MULTIPLE_MESSAGES.times.reduce([]) do |s, _|
result = basic_get
break s if result.nil? || result.last.nil?
s << result
end
end
def ack(delivery_tag)
channel.ack(delivery_tag, true)
end
我发现的另一个解决方法是使用订阅方法。但我还没有找到打破内循环订阅的方法。
queue_manager.subscribe do |delivery_info, properties, payload|
cache << [delivery_info, properties, payload]
if cache.size >= 65_000
dt = process(cache)
queue_manager.ack(dt)
end
end
我用谷歌搜索,发现这对我有用:
length = queue.size
cache = []
consumer = queue.subscribe(block: false) do |delivery_info, properties, payload|
cache << [delivery_info, properties, payload]
if cache.size >= length
dt = process(cache)
queue.ack(dt)
consumer.cancel
end
end
- 但是在主线程中会发生什么?
- 如何正确阻塞主线程,并在处理完成时解除阻塞?
- 还有其他方法可以从队列中加载多条消息,处理它们,关闭消费者并移动到下一个队列吗?