0

有什么方法可以暂时暂停消费者并在以后恢复它?

这是我想做的一个例子:

require "bunny"
conn = Bunny.new
conn.start

ch1 = conn.create_channel
publisher = ch.direct('test', :auto_delete => false)

consumer1 = nil
Thread.new do
    ch2 = conn.create_channel(nil, 8) #Using eight worker
    queue1 = ch2.queue('', :exclusive => true)
    queue1.bind(publisher, :routing_key => 'low_priority')
    consumer1 = queue1.subscribe(:block => true) do |delivery_info, properties, payload|
       #do some work
    end
end

Thread.new do
    ch3 = conn.create_channel
    queue2 = ch3.queue('', :exclusive => true)
    queue2.bind(publisher, :routing_key => 'high_priority')
    consumer2 = queue2.subscribe(:block => true) do |delivery_info, properties, payload|
        consumer1.pause   #pause the other consumer
        #do other things
        consumer1.resume  #resume the consumer
    end
end
#rest of the code

当我在消费者 2 处工作时,我想暂停消费者 1。有什么有效的方法可以做到这一点吗?

4

1 回答 1

0

如果你想创建优先级策略,bunny 已经实现了这个:

http://rubybunny.info/articles/queues.html#consumer_priorities

q = ch.queue("a.queue")
q.subscribe(:manual_ack => true, :arguments => {"x-priority" => 5}) do |delivery_info, properties, payload|
  # ...
end
q.subscribe(:manual_ack => true, :arguments => {"x-priority" => 2}) do |delivery_info, properties, payload|
  # ...
end

如果你想并行处理它,我建议使用这个 gem:

https://github.com/grosser/parallel

例子:

results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| ... }

希望能帮助到你。

于 2016-04-24T21:33:18.147 回答