线程的订阅块没有执行,因为队列根本没有收到任何消息。详细地说,在这种情况下,队列最终是在消息发布后创建的。
这可以通过将消息切换到:mandatory => true
并使用来可视化Bunny::Exchange#on_return
:
代码
#!/usr/bin/env ruby
require "bunny"
quit = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
正如我们所看到的,所有消息最终都以NO_ROUTE
.
在发布消息之前强制队列(和路由)存在的简单解决方案:
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer_queued = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
consumer_queued = true
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
$stdout.flush
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
# ensure queue is ready
sleep 0.125 while !consumer_queued
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
$stdout.flush
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
输出(带退货通知)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
收到预期的消息并返回其余消息。