鉴于我有一个工作人员订阅了两个队列“低”和“高”,我希望工作人员仅在高优先级队列为空时处理来自低优先级队列的消息。
我试图通过定义两个通道并将预取设置为更高优先级队列上的更高值来做到这一点,如下所示:http: //dougbarth.github.io/2011/07/01/approximating-priority-with -rabbitmq.html
这是我的工人代码:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel_low = AMQP::Channel.new(connection)
channel_high = AMQP::Channel.new(connection)
# Attempting to set the prefetch higher on the high priority queue
channel_low.prefetch(10)
channel_high.prefetch(20)
low_queue = channel_low.queue("low", :auto_delete => false)
high_queue = channel_high.queue("high", :auto_delete => false)
low_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
high_queue.subscribe do |payload|
puts "#{payload}"
slow_task
end
def slow_task
# Do some slow work
sleep(1)
end
end
当我针对它运行此客户端时,我没有看到首先处理的高优先级消息:
require "rubygems"
require "amqp"
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
low_queue = channel.queue("low")
high_queue = channel.queue("high")
exchange = channel.direct("")
10.times do |i|
message = "LOW #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => low_queue.name
end
# EventMachine.add_periodic_timer(0.0001) do
10.times do |i|
message = "HIGH #{i}"
puts "sending: #{message}"
exchange.publish message, :routing_key => high_queue.name
end
end
输出:
Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9
Server >>>
HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9