2

鉴于我有一个工作人员订阅了两个队列“低”和“高”,我希望工作人员仅在高优先级队列为空时处理来自低优先级队列的消息。

我试图通过定义两个通道并将预取设置为更高优先级队列上的更高值来做到这一点,如下所示:http: //dougbarth.github.io/2011/07/01/approximating-priority-with -rabbitmq.html

这是我的工人代码:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  low_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  high_queue.subscribe do |payload|
    puts "#{payload}"
    slow_task
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

当我针对它运行此客户端时,我没有看到首先处理的高优先级消息:

require "rubygems"
require "amqp"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel  = AMQP::Channel.new(connection)

  low_queue    = channel.queue("low")
  high_queue    = channel.queue("high")
  exchange = channel.direct("")

  10.times do |i| 
    message = "LOW #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => low_queue.name
  end

  # EventMachine.add_periodic_timer(0.0001) do
  10.times do |i|
    message = "HIGH #{i}"
    puts "sending: #{message}"
    exchange.publish message, :routing_key => high_queue.name
  end

end

输出:

Client >>>
sending: LOW 0
sending: LOW 1
sending: LOW 2
sending: LOW 3
sending: LOW 4
sending: LOW 5
sending: LOW 6
sending: LOW 7
sending: LOW 8
sending: LOW 9
sending: HIGH 0
sending: HIGH 1
sending: HIGH 2
sending: HIGH 3
sending: HIGH 4
sending: HIGH 5
sending: HIGH 6
sending: HIGH 7
sending: HIGH 8
sending: HIGH 9

Server >>>

HIGH 0
HIGH 1
LOW 0
LOW 1
LOW 2
HIGH 2
LOW 3
LOW 4
LOW 5
LOW 6
LOW 7
HIGH 3
LOW 8
LOW 9
HIGH 4
HIGH 5
HIGH 6
HIGH 7
HIGH 8
HIGH 9
4

4 回答 4

2

就像迈克尔说的那样,您当前的方法存在一些问题:

  • 不启用显式确认意味着 RabbitMQ 在发送消息时考虑消息传递,而不是在您处理它们时
  • 您的消息是如此之小,以至于它们能够通过网络快速传递
  • 当 EventMachine 读取网络数据时调用您的 subscribe 块,对于从套接字读取的每个完整数据帧一次
  • 最后,阻塞反应器线程(使用睡眠)将阻止 EM 将确认发送到套接字,因此无法实现正确的行为。

为了实现优先级概念,您需要将接收网络数据与确认网络数据分开。在我们的应用程序(我写过博客文章)中,我们使用了一个后台线程和一个优先级队列来重新排序进入的工作。这在每个工作人员中引入了一个小的消息缓冲区。其中一些消息可能是低优先级的消息,在没有更高优先级的消息可以处理之前,它们不会被处理。

这是一个稍微修改的工作代码,它使用工作线程和优先级队列来获得所需的结果。

require "rubygems"
require "amqp"
require "pqueue"

EventMachine.run do
  connection = AMQP.connect(:host => '127.0.0.1')
  channel_low  = AMQP::Channel.new(connection)
  channel_high  = AMQP::Channel.new(connection)

  # Attempting to set the prefetch higher on the high priority queue
  channel_low.prefetch(10)
  channel_high.prefetch(20)

  low_queue    = channel_low.queue("low", :auto_delete => false)
  high_queue    = channel_high.queue("high", :auto_delete => false)

  # Our priority queue for buffering messages in the worker's memory
  to_process = PQueue.new {|a,b| a[0] > b[0] }

  # The pqueue gem isn't thread safe
  mutex = Mutex.new

  # Background thread for working blocking operation. We can spin up more of
  # these to increase concurrency.
  Thread.new do
    loop do
      _, header, payload = mutex.synchronize { to_process.pop }

      if payload
        puts "#{payload}"
        slow_task
        # We need to call ack on the EM thread.
        EM.next_tick { header.ack }
      else
        sleep(0.1)
      end
    end
  end

  low_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [0, header, payload] }
  end

  high_queue.subscribe(:ack => true) do |header, payload|
    mutex.synchronize { to_process << [10, header, payload] }
  end

  def slow_task
    # Do some slow work
    sleep(1)
  end
end

如果需要增加并发性,可以生成多个后台线程。

于 2013-06-06T20:50:58.393 回答
1

您的方法是最常用的解决方法之一。但是,发布的具体示例中存在一些问题。

预取不控制哪个通道具有交付优先级。它控制有多少消息可以在其上“进行中”(未确认)。

这可以用作穷人的优先级技术,但是,您使用自动确认模式,因此预取不会发挥作用(RabbitMQ 立即认为您的消息一经发出就已确认)。

如果您只发布少量消息,然后您的示例完成运行,则排序很可能很大程度上取决于您发布消息的顺序。

要通过手动确认查看预取设置的效果,您需要运行它更长的时间(这取决于您的消息速率,但至少一分钟。

于 2013-06-06T10:46:17.150 回答
0

您正在向处理器发布小消息,该处理器可能会在它们到达后的几毫秒内处理它们。我怀疑你在这里看到的是样本噪音,而不是你真正认为你看到的。

您能否验证消息发布的顺序是否符合您的预期?

于 2013-06-05T12:34:41.803 回答
0

你能试试这样的吗。我不确定这是否会产生您需要的东西,但值得一试。

10.times do 
   10.times do 
      exchange.publish "HIGH", :routing_key => high_queue.name
    end  
   exchange.publish "LOW", :routing_key => low_queue.name
end
于 2013-06-05T11:53:32.523 回答