1

In java RabbitMQ client I can do (code in ruby):

 consumer = QueueingConsumer.new(channel);
 channel.basicConsume(queue_name, true, consumer);
 consumer.nextDelivery.getBody

And then third line blocks thread until message comes. But how to achieve it in Bunny client? I can only use block:

channel.queue('').bind(@x, :routing_key => rk).subscribe(block: true) do |_, _, payload|
  # do something
end

or non blocking pop:

delivery_info, properties, payload = q.pop

Is there a way to achieve it like in jruby client using Bunny? The reason I want is that after receiving a message I would like to continue job in my current context.

4

3 回答 3

2

调用subscribe因传递而阻塞:block => true。如果您需要访问块之外的有效负载,您可以利用 Ruby 的作用域规则:

the_payload = nil
queue = channel.queue('').bind(@x, :routing_key => rk)
queue.subscribe(block: true) do |delivery_info, _, payload|
  the_payload = payload
  channel.consumers[delivery_info.consumer_tag].cancel
end
# the_payload is now the one received in the block!
于 2013-05-21T15:38:42.927 回答
1

Rob Harrop 的回答确实取消了队列,但它并没有结束我的障碍。以下都使用红宝石Queue

require "thread"

unblock = Queue.new # Ruby Queue, not Bunny::Queue
queue = channel.queue('').bind(@x, :routing_key => rk)
consumer = queue.subscribe do |delivery_info, properties, body|
  # do something
  result = determine_if_it_is_time_to_move_on
  unblock.enq true if result
end

unblock.deq # block until a message is enqueued in the ruby Queue
consumer.cancel
于 2016-12-10T02:59:23.123 回答
0

我需要从队列中接收一条消息。Bunny'sQueue#pop是非阻塞的,没有等待的选项。我还需要支持超时,我最终实现了这个:

require "thread"

mutex = Mutex.new
var = ConditionVariable.new
payload = nil
consumer = queue.subscribe(block: false) do |_, _, x_payload|
  mutex.synchronize do
    payload = x_payload
    var.signal
  end
end

mutex.synchronize do
  deadline = Time.now + 10
  while payload.nil? && (remaining = deadline - Time.now) > 0
    var.wait(mutex, remaining)
  end
end

consumer.cancel
raise "timed out waiting for response" if payload.blank?

部分灵感来自https://spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/

此代码尚未经过实战测试。它适用于使用单个消息进行分期。它可能无法大规模运行。我打算将所有这些复杂性隐藏在一个猴子补丁中Bunny::Queue。调用者会看到一个简单的blocking_pop(timeout:)API。

于 2018-12-14T14:25:11.470 回答