5

出于与本讨论中类似的原因,我正在尝试使用消息传递代替 REST,以实现从一个 Rails 3 应用程序到另一个应用程序的同步 RPC 调用。这两个应用程序都在精简运行。

“服务器”应用程序有一个config/initializers/amqp.rb基于ruby​​amqp.info 文档中的请求/回复模式的文件:

require "amqp"

EventMachine.next_tick do
  connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
  channel    = AMQP::Channel.new(connection)

  requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
  requests_queue.subscribe(:ack => true) do |metadata, payload|
    puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
    channel.default_exchange.publish(Time.now.to_s,
                                     :routing_key    => metadata.reply_to,
                                     :correlation_id => metadata.message_id,
                                     :mandatory      => true)
    metadata.ack
  end

  Signal.trap("INT") { connection.close { EventMachine.stop } }
end

在“客户端”应用程序中,我想在视图中呈现对“服务器”的同步调用的结果。我意识到这有点超出了像 amqp gem 这样的固有异步库的舒适区,但我想知道是否有办法让它工作。这是我的客户config/initializers/amqp.rb

require 'amqp'

EventMachine.next_tick do
  AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'  
  Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end

这是控制器:

require "amqp"

class WelcomeController < ApplicationController
  def index    
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect
    end   
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

当我在不同的端口上启动两个应用程序并访问welcome#index视图时。 @message在视图中为 nil,因为尚未返回结果。结果在视图渲染后几毫秒到达并显示在控制台上:

$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"

这里毫不奇怪:subscribe显然不适合同步调用。令人惊讶的是,我在 AMQP gem 源代码或任何在线文档中都找不到同步替代方案。是否有替代方案subscribe可以提供我想要的 RPC 行为?鉴于系统的其他部分我想使用合法的异步调用,兔子 gem似乎不是适合这项工作的工具。我应该再看看吗?

编辑以回应 Sam Stokes

感谢 Sam 提供了 throw :async / async.callback 的指针。我以前从未见过这种技术,而这正是我最初试图通过这个实验学习的东西。send_response.finish在 Rails 3 中消失了,但我能够让他的示例至少为一个请求工作,只需稍作改动:

render :text => @message
rendered_response = response.prepare!

后续请求失败并显示!! Unexpected error while processing request: deadlock; recursive locking. 这可能是 Sam 关于让 ActionController 允许并发请求的评论所得到的,但引用的要点仅适用于 Rails 2。添加config.allow_concurrency = truedevelopment.rb 可以消除 Rails 3 中的这个错误,但会导致This queue already has default consumer.来自 AMQP。

我认为这头牦牛已经刮得很干净了。;-)

虽然很有趣,但这对于简单的 RPC 来说显然是多余的。像这样的Sinatra 流示例似乎更适合客户端与回复交互的用例。Tenderlove 也有一篇博客文章,介绍了即将在 Rails 4 中流式传输事件的方法,该方法可以与 AMQP 一起使用。

正如 Sam 在他对 HTTP 替代方案的讨论中指出的那样,REST / HTTP 对于我的系统中涉及两个 Rails 应用程序的 RPC 部分非常有意义。系统的其他部分涉及到 Clojure 应用程序的更经典的异步事件发布。对于这些,Rails 应用程序只需要以即发即弃的方式发布事件,因此 AMQP 可以在没有回复队列的情况下使用我的原始代码在那里正常工作。

4

1 回答 1

5

您可以获得您想要的行为 - 让客户端发出一个简单的 HTTP 请求,您的 Web 应用程序会异步响应该请求 - 但您需要更多技巧。您需要使用 Thin 对异步响应的支持:

require "amqp"

class WelcomeController < ApplicationController
  def index
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect

      # Trigger Rails response rendering now we have the message.
      # Tested in Rails 2.3; may or may not work in Rails 3.x.
      rendered_response = send_response.finish

      # Pass the response to Thin and make it complete the request.
      # env['async.callback'] expects a Rack-style response triple:
      # [status, headers, body]
      request.env['async.callback'].call(rendered_response)
    end

    # This unwinds the call stack, skipping the normal Rails response
    # rendering, all the way back up to Thin, which catches it and
    # interprets as "I'll give you the response later by calling
    # env['async.callback']".
    throw :async
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

就客户端而言,结果与您的 Web 应用程序在返回响应之前阻塞同步调用无法区分;但是现在您的 Web 应用程序可以同时处理许多此类请求。

警告!

Async Rails 是一种高级技术;你需要知道你在做什么。Rails 的某些部分不喜欢突然拆除它们的调用堆栈。将throw绕过任何不知道捕获并重新抛出它的 Rack 中间件(这是一个相当古老的部分解决方案)。ActiveSupport 的开发模式类重新加载将在 之后重新加载您的应用程序的类throw,而无需等待响应,如果您的回调引用已重新加载的类,这可能会导致非常混乱的损坏。您还需要很好地询问 ActionController以允许并发请求。

请求/响应

您还需要匹配请求和响应。就目前而言,如果请求 1 到达,然后请求 2 在请求 1 得到响应之前到达,则不确定哪个请求将接收响应 1(队列上的消息在订阅队列的消费者之间循环分发)。

您可以通过检查correlation_id(顺便说一下,您必须明确设置-RabbitMQ 不会为您这样做!)并重新排队消息(如果它不是您正在等待的响应)来完成此操作。我的方法是创建一个持久的 Publisher 对象,该对象将跟踪打开的请求,侦听所有响应,并根据correlation_id 查找要调用的适当回调。

替代方案:只使用 HTTP

您在这里确实解决了两个不同(而且很棘手!)的问题:说服 Rails/thin 异步处理请求,以及在 AMQP 的发布-订阅模型之上实现请求-响应语义。既然你说这是在两个 Rails 应用程序之间调用,为什么不直接使用 HTTP,它已经具有你需要的请求-响应语义?这样你只需要解决第一个问题。如果您使用非阻塞 HTTP 客户端库,例如em-http-request ,您仍然可以获得并发请求处理。

于 2012-11-28T23:00:42.520 回答