出于与本讨论中类似的原因,我正在尝试使用消息传递代替 REST,以实现从一个 Rails 3 应用程序到另一个应用程序的同步 RPC 调用。这两个应用程序都在精简运行。
“服务器”应用程序有一个config/initializers/amqp.rb
基于rubyamqp.info 文档中的请求/回复模式的文件:
require "amqp"
EventMachine.next_tick do
connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
requests_queue.subscribe(:ack => true) do |metadata, payload|
puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
channel.default_exchange.publish(Time.now.to_s,
:routing_key => metadata.reply_to,
:correlation_id => metadata.message_id,
:mandatory => true)
metadata.ack
end
Signal.trap("INT") { connection.close { EventMachine.stop } }
end
在“客户端”应用程序中,我想在视图中呈现对“服务器”的同步调用的结果。我意识到这有点超出了像 amqp gem 这样的固有异步库的舒适区,但我想知道是否有办法让它工作。这是我的客户config/initializers/amqp.rb
:
require 'amqp'
EventMachine.next_tick do
AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'
Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end
这是控制器:
require "amqp"
class WelcomeController < ApplicationController
def index
puts "[request] Sending a request..."
WelcomeController.channel.default_exchange.publish("get.time",
:routing_key => "amqpgem.examples.services.time",
:message_id => Kernel.rand(10101010).to_s,
:reply_to => WelcomeController.replies_queue.name)
WelcomeController.replies_queue.subscribe do |metadata, payload|
puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
@message = payload.inspect
end
end
def self.channel
@channel ||= AMQP::Channel.new(AMQP.connection)
end
def self.replies_queue
@replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
end
end
当我在不同的端口上启动两个应用程序并访问welcome#index
视图时。
@message
在视图中为 nil,因为尚未返回结果。结果在视图渲染后几毫秒到达并显示在控制台上:
$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"
这里毫不奇怪:subscribe
显然不适合同步调用。令人惊讶的是,我在 AMQP gem 源代码或任何在线文档中都找不到同步替代方案。是否有替代方案subscribe
可以提供我想要的 RPC 行为?鉴于系统的其他部分我想使用合法的异步调用,兔子 gem似乎不是适合这项工作的工具。我应该再看看吗?
编辑以回应 Sam Stokes
感谢 Sam 提供了 throw :async / async.callback 的指针。我以前从未见过这种技术,而这正是我最初试图通过这个实验学习的东西。send_response.finish
在 Rails 3 中消失了,但我能够让他的示例至少为一个请求工作,只需稍作改动:
render :text => @message
rendered_response = response.prepare!
后续请求失败并显示!! Unexpected error while processing request: deadlock; recursive locking
. 这可能是 Sam 关于让 ActionController 允许并发请求的评论所得到的,但引用的要点仅适用于 Rails 2。添加config.allow_concurrency = true
development.rb 可以消除 Rails 3 中的这个错误,但会导致This queue already has default consumer.
来自 AMQP。
我认为这头牦牛已经刮得很干净了。;-)
虽然很有趣,但这对于简单的 RPC 来说显然是多余的。像这样的Sinatra 流示例似乎更适合客户端与回复交互的用例。Tenderlove 也有一篇博客文章,介绍了即将在 Rails 4 中流式传输事件的方法,该方法可以与 AMQP 一起使用。
正如 Sam 在他对 HTTP 替代方案的讨论中指出的那样,REST / HTTP 对于我的系统中涉及两个 Rails 应用程序的 RPC 部分非常有意义。系统的其他部分涉及到 Clojure 应用程序的更经典的异步事件发布。对于这些,Rails 应用程序只需要以即发即弃的方式发布事件,因此 AMQP 可以在没有回复队列的情况下使用我的原始代码在那里正常工作。