2

我正在使用bunny ruby​​ gem向 rabbitmq 服务器发送和接收消息。如何在等待多长时间超时的同时从队列中同步弹出消息(即,如果 3 秒后没有消息到达,则停止阻塞)?

一个明显的解决方案是只循环 pop 调用,直到超时过期或收到消息,但这似乎非常低效。有没有更优雅的解决方案?我查看了 bunny 的文档以及 rabbitmq 网站上的教程,但我没有找到针对这种特定场景的解决方案。

4

3 回答 3

2

为了使这样的功能 a 被迫重写基本方法订阅。我发现我们可以为通道设置超时时间,但是函数中没有这样的输入参数。

response = nil

subscribe(block: true, timeout: 10) do |delivery_info, properties, payload|
  Rails.logger.info "got message #{payload}"
  response = payload
  @channel.consumers[delivery_info.consumer_tag].cancel
end



def subscribe(opts = {block: false}, &block)
    ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
    consumer = Bunny::Consumer.new(@channel,@response_queue,ctag)

    consumer.on_delivery(&block)
    @channel.basic_consume_with(consumer)

    if opts[:block]
      @channel.work_pool.join(opts[:timeout])
    end
end
于 2016-02-01T09:28:16.803 回答
1

我没有找到使用 Bunny 轻松完成此操作的方法,而且我在这里提出的建议不会超时。但它确实支持每次调用语义检索一条消息。鉴于 Bunny 内部使用线程池来接收消息,我认为更简单的方法可能是使用阻塞队列(例如 Ruby 的Queue类)将消息从 Bunny 的线程池传输到调用线程。类似于以下内容:

# Set up your internal queue somewhere (in your class's initialize maybe?)
@internal_queue = Queue.new

# In the main thread that needs to block
...
# the call to subscribe is non-blocking
queue.subscribe do |delivery_info, properties, payload|
  @internal_queue.enq(payload)  # this runs inside Bunny's pool
end

# the call to deq is blocking
response = @internal_queue.deq  # this blocks the main thread till a
                                # message is pushed to the internal_q

您可以为需要监听的每个 AMQP 通道维护一个 @internal_queue。您可以将这些部分分解为单独的方法,并制作一个简洁的阻塞 API,一次返回一条消息。

后来我创建了一个 TimedWaitableQueue 类,它包装了一个使用监视器 MonitorMixin 扩展的简单数组,然后使用互斥体 + 条件变量语义。这允许阻塞具有超时的出队调用。

于 2016-08-04T16:39:58.603 回答
0

作为@Ilya 对上述代码的细微改动:https ://stackoverflow.com/a/35126963/448858我发现我必须创建一个线程超时然后关闭通道的工作池

module Bunny
  class Queue
    def subscribe(opts = { block: false, timeout: 1000 }, &block)
      ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
      consumer = Consumer.new(@channel, self, ctag)
      consumer.on_delivery(&block)
      @channel.basic_consume_with(consumer)
      if opts[:block]
        Thread.new do
          sleep(opts[:timeout]/1000.0)
          @channel.work_pool.shutdown
        end
        @channel.work_pool.join
      end
    end
  end
end
于 2019-07-19T15:18:12.100 回答