我正在尝试使用 rabbitmq 构建 RPC。
根据通过rabbitmq构建RPC的教程http://www.rabbitmq.com/tutorials/tutorial-six-ruby.html,我们可以为每个客户端使用一个reply-queue,并使用correlation_id来映射响应和请求。我对如何使用correlation_id 感到困惑?
这是我正在运行的问题,我想从一个客户端同步创建两个 rpc 调用,使用具有两个不同相关 id 的相同回复队列。但是我不知道这是否是正确的用例,因为从我在教程中阅读的内容来看,它似乎假设每个客户端都在按顺序进行 rpc 调用。(在这种情况下,我们为什么需要correlation_id在这里变得更加混乱)。
这是我想要实现的代码示例(rpc_server.rb 将与教程相同)。希望它能让我的问题更清楚。
下面的代码块不起作用,因为当我们在 thr1 中设置时,correlation_id 被 thr2 覆盖。
我想知道是否有任何修改它,使其工作?如果我们尝试将@reply_queue.subscribe 块移出初始化并传入不同的call_id,它仍然不起作用,因为看起来@reply-queue 在等待thr1 完成时将被锁定。
如果问题不清楚,请告诉我,提前感谢您的任何回复。
#!/usr/bin/env ruby
# encoding: utf-8
require "bunny"
require "thread"
conn = Bunny.new(:automatically_recover => false)
conn.start
ch = conn.create_channel
class FibonacciClient
attr_reader :reply_queue
attr_accessor :response, :call_id
attr_reader :lock, :condition
def initialize(ch, server_queue)
@ch = ch
@x = ch.default_exchange
@server_queue = server_queue
@reply_queue = ch.queue("", :exclusive => true)
@lock = Mutex.new
@condition = ConditionVariable.new
that = self
@reply_queue.subscribe do |delivery_info, properties, payload|
if properties[:correlation_id] == that.call_id
that.response = payload.to_i
that.lock.synchronize{that.condition.signal}
end
end
end
def call(n)
self.call_id = self.generate_uuid
@x.publish(n.to_s,
:routing_key => @server_queue,
:correlation_id => call_id,
:reply_to => @reply_queue.name)
lock.synchronize{condition.wait(lock)}
response
end
protected
def generate_uuid
# very naive but good enough for code
# examples
"#{rand}#{rand}#{rand}"
end
end
client = FibonacciClient.new(ch, "rpc_queue")
thr1 = Thread.new{
response1 = client.call(30)
puts response1
}
thr2 = Thread.new{
response2 = client.call(40)
puts response2
}
ch.close
conn.close