0

我正在为我的 Ruby on Rails 应用程序使用 AMQP / RabbitMQ。

我将以下amqp.rb文件放在config/initializers:(从配方复制并更改:http ://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs )

require 'amqp'

# References:
#   1. Getting Started with AMQP and Ruby
#      http://rubyamqp.info/articles/getting_started/
#   2. EventMachine and Rails
#      http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
#   3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
#      http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
  def self.start
    if defined?(PhusionPassenger)
      Rails.logger.info "###############################################################################"
      Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
      Rails.logger.info "###############################################################################"
      PhusionPassenger.on_event(:starting_worker_process) do |forked|
      # =>  for passenger, we need to avoid orphaned threads
        if forked && EventMachine.reactor_running?
          EventMachine.stop
        end

        spawn_eventmachine_thread
        die_gracefully_on_signal
      end
    else
      Rails.logger.info "###############################################################################"
      Rails.logger.info "PhusionPassenger is not running.  Probably you are running Rails locally ......"
      Rails.logger.info "###############################################################################"

      # faciliates debugging
      Thread.abort_on_exception = true
      # just spawn a thread and start it up
      spawn_eventmachine_thread unless defined?(Thin)
      # Thin is built on EventMachine, doesn't need this thread
    end
  end

  def self.spawn_eventmachine_thread
    Thread.new {
      EventMachine.run do
        AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST ))
        AMQP.channel.on_error(&method(:handle_channel_exception))
        AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
                    .subscribe(&method(:handle_sending))
      end
    }
  end

  def self.handle_sending(metadata, payload)
    puts "handle_sending: ######################"
    Rails.logger.info "Received a message: #{payload}, content_type = #{metadata.content_type}"
    Rails.logger.info "Sending to the Mixpanel server ......"

    # In a worker process on this/another machine

    Rails.logger.info "Sent the message to the Mixpanel server ......"
  end

  def self.handle_channel_exception(channel, channel_close)
    Rails.logger.error "###############################################################################"
    Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
    Rails.logger.error "###############################################################################"
  end

  def self.die_gracefully_on_signal
    Rails.logger.error "###############################################################################"
    Rails.logger.error "Stopping the EventMachine ......"
    Signal.trap("INT") { EventMachine.stop }
    Signal.trap("TERM") { EventMachine.stop }
    Rails.logger.error "###############################################################################"
  end
end

AppEventMachine.start

订阅处理程序handle_sending需要很长时间(数分钟或超过 10 分钟)来响应。

这是正常的吗?那么我需要在 AMQP 或 RabbitMQ 中进行任何配置设置吗?

谢谢。

编辑

  1. 如果我在控制台上执行此页面 ( http://rubyamqp.info/articles/getting_started/ )的“Hello World”示例rails c,它会非常快。

  2. 我已经设置了ulimit -n 1024

  3. 如果我next_tick喜欢下面的内容rails c,它会非常快:

=========

require "rubygems"
require "amqp"

EventMachine.run do
  EventMachine.next_tick {
    puts "next_tick triggerred"
  }
end

=========

4

0 回答 0