2

如果在线程中,我无法让队列订阅块执行。

rubybunny/exchanges中的示例按预期工作。但是,如果与线程中的消费者部分相适应,则订阅者块似乎不会执行。

我尝试了几种简单的变体,包括设置共享变量标志,但都没有成功。

我错过了什么?

代码
#!/usr/bin/env ruby
require "bunny"

quit = false

consumer = Thread.new do
  puts "consumer start"

  cnx = Bunny.new
  cnx.start
  cn  = cnx.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  loop {
    sleep 1
    break if quit
  }

  cnx.close
  puts "consumer done"
end

connection = Bunny.new
connection.start
connection  = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :routing_key => "europe.italy.roma").
  publish("Paris update",            :routing_key => "europe.france.paris")

sleep 5
connection.close

quit = true
consumer.join
实际输出
consumer start
consumer done
预期产出
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
4

1 回答 1

2

线程的订阅块没有执行,因为队列根本没有收到任何消息。详细地说,在这种情况下,队列最终是在消息发布后创建的。

这可以通过将消息切换到:mandatory => true并使用来可视化Bunny::Exchange#on_return

代码
#!/usr/bin/env ruby
require "bunny"

quit = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

正如我们所看到的,所有消息最终都以NO_ROUTE.

在发布消息之前强制队列(和路由)存在的简单解决方案:

#!/usr/bin/env ruby
require "bunny"

quit = false
consumer_queued = false

connection = Bunny.new
connection.start

consumer = Thread.new do
  puts "consumer start"
  cn  = connection.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  consumer_queued = true
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
    $stdout.flush
  end

  sleep 1 while !quit

  cn.close
  puts "consumer done"
end

# ensure queue is ready
sleep 0.125  while !consumer_queued

channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
  puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
  $stdout.flush
end

exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
  publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")

channel.close
sleep 5

quit = true
consumer.join
connection.close
输出(带退货通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done

收到预期的消息并返回其余消息。

于 2018-02-08T17:55:15.163 回答