我正在使用bunny ruby gem向 rabbitmq 服务器发送和接收消息。如何在等待多长时间超时的同时从队列中同步弹出消息(即,如果 3 秒后没有消息到达,则停止阻塞)?
一个明显的解决方案是只循环 pop 调用,直到超时过期或收到消息,但这似乎非常低效。有没有更优雅的解决方案?我查看了 bunny 的文档以及 rabbitmq 网站上的教程,但我没有找到针对这种特定场景的解决方案。
我正在使用bunny ruby gem向 rabbitmq 服务器发送和接收消息。如何在等待多长时间超时的同时从队列中同步弹出消息(即,如果 3 秒后没有消息到达,则停止阻塞)?
一个明显的解决方案是只循环 pop 调用,直到超时过期或收到消息,但这似乎非常低效。有没有更优雅的解决方案?我查看了 bunny 的文档以及 rabbitmq 网站上的教程,但我没有找到针对这种特定场景的解决方案。
为了使这样的功能 a 被迫重写基本方法订阅。我发现我们可以为通道设置超时时间,但是函数中没有这样的输入参数。
response = nil
subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
Rails.logger.info "got message #{payload}"
response = payload
@channel.consumers[delivery_info.consumer_tag].cancel
end
def subscribe(opts = {block: false}, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
@channel.work_pool.join(opts[:timeout])
end
end
我没有找到使用 Bunny 轻松完成此操作的方法,而且我在这里提出的建议不会超时。但它确实支持每次调用语义检索一条消息。鉴于 Bunny 内部使用线程池来接收消息,我认为更简单的方法可能是使用阻塞队列(例如 Ruby 的Queue
类)将消息从 Bunny 的线程池传输到调用线程。类似于以下内容:
# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new
# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
@internal_queue.enq(payload) # this runs inside Bunny's pool
end
# the call to deq is blocking
response = @internal_queue.deq # this blocks the main thread till a
# message is pushed to the internal_q
您可以为需要监听的每个 AMQP 通道维护一个 @internal_queue。您可以将这些部分分解为单独的方法,并制作一个简洁的阻塞 API,一次返回一条消息。
后来我创建了一个 TimedWaitableQueue 类,它包装了一个使用监视器 MonitorMixin 扩展的简单数组,然后使用互斥体 + 条件变量语义。这允许阻塞具有超时的出队调用。
作为@Ilya 对上述代码的细微改动:https ://stackoverflow.com/a/35126963/448858我发现我必须创建一个线程超时然后关闭通道的工作池
module Bunny
class Queue
def subscribe(opts = { block: false, timeout: 1000 }, &block)
ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
consumer = Consumer.new(@channel, self, ctag)
consumer.on_delivery(&block)
@channel.basic_consume_with(consumer)
if opts[:block]
Thread.new do
sleep(opts[:timeout]/1000.0)
@channel.work_pool.shutdown
end
@channel.work_pool.join
end
end
end
end