0

我正在为我的 Ruby on Rails 应用程序使用 AMQP / RabbitMQ。

我将以下 amqp.rb 文件放在 config/initializers 下:(从配方复制并更改:http ://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs )

require 'amqp'

# References:
#   1. Getting Started with AMQP and Ruby
#      http://rubyamqp.info/articles/getting_started/
#   2. EventMachine and Rails
#      http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
#   3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
#      http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
  def self.start
    if defined?(PhusionPassenger)
      Rails.logger.info "###############################################################################"
      Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
      Rails.logger.info "###############################################################################"
      PhusionPassenger.on_event(:starting_worker_process) do |forked|
      # =>  for passenger, we need to avoid orphaned threads
        if forked && EventMachine.reactor_running?
          EventMachine.stop
        end

        spawn_eventmachine_thread
        die_gracefully_on_signal
      end
    else
      Rails.logger.info "###############################################################################"
      Rails.logger.info "PhusionPassenger is not running.  Probably you are running Rails locally ......"
      Rails.logger.info "###############################################################################"

      # faciliates debugging
      Thread.abort_on_exception = true
      # just spawn a thread and start it up
      spawn_eventmachine_thread unless defined?(Thin)
      # Thin is built on EventMachine, doesn't need this thread
    end
  end

  def self.spawn_eventmachine_thread
    Thread.new {
      EventMachine.run do
        AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST ))
        AMQP.channel.on_error(&method(:handle_channel_exception))
        AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
                    .subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
      end
    }
  end

  def self.handle_channel_exception(channel, channel_close)
    Rails.logger.error "###############################################################################"
    Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
    Rails.logger.error "###############################################################################"
  end

  def self.die_gracefully_on_signal
    Signal.trap("INT") {
      Rails.logger.error "###############################################################################"
      Rails.logger.error "Stopping the EventMachine ......"
      EventMachine.stop
      Rails.logger.error "###############################################################################"
    }
    Signal.trap("TERM") {
      Rails.logger.error "###############################################################################"
      Rails.logger.error "Stopping the EventMachine ......"
      EventMachine.stop
      Rails.logger.error "###############################################################################"
    }
  end
end

AppEventMachine.start

在我使用 PhusionPassenger 启动 Rails 后,我看到它正在使用 PhusionPassenger 运行,然后我尝试向队列发送消息,但令我惊讶的是:

.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }

subscribe处理程序只执行一次,即只收到第一条消息,任何其他消息(第 2 条,第 3 条),处理subscribe程序永远不会被调用。

4

1 回答 1

0

感谢约书亚的评论。结果证明MixpanelJob::handl_sending失败了,它阻塞了 EventMachine 线程。

于 2013-07-31T16:51:52.017 回答