我正在为我的 Ruby on Rails 应用程序使用 AMQP / RabbitMQ。
我将以下 amqp.rb 文件放在 config/initializers 下:(从配方复制并更改:http ://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs )
require 'amqp'
# References:
# 1. Getting Started with AMQP and Ruby
# http://rubyamqp.info/articles/getting_started/
# 2. EventMachine and Rails
# http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
# 3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
# http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
def self.start
if defined?(PhusionPassenger)
Rails.logger.info "###############################################################################"
Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
Rails.logger.info "###############################################################################"
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# => for passenger, we need to avoid orphaned threads
if forked && EventMachine.reactor_running?
EventMachine.stop
end
spawn_eventmachine_thread
die_gracefully_on_signal
end
else
Rails.logger.info "###############################################################################"
Rails.logger.info "PhusionPassenger is not running. Probably you are running Rails locally ......"
Rails.logger.info "###############################################################################"
# faciliates debugging
Thread.abort_on_exception = true
# just spawn a thread and start it up
spawn_eventmachine_thread unless defined?(Thin)
# Thin is built on EventMachine, doesn't need this thread
end
end
def self.spawn_eventmachine_thread
Thread.new {
EventMachine.run do
AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST ))
AMQP.channel.on_error(&method(:handle_channel_exception))
AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
end
}
end
def self.handle_channel_exception(channel, channel_close)
Rails.logger.error "###############################################################################"
Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
Rails.logger.error "###############################################################################"
end
def self.die_gracefully_on_signal
Signal.trap("INT") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
Signal.trap("TERM") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
end
end
AppEventMachine.start
在我使用 PhusionPassenger 启动 Rails 后,我看到它正在使用 PhusionPassenger 运行,然后我尝试向队列发送消息,但令我惊讶的是:
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
subscribe
处理程序只执行一次,即只收到第一条消息,任何其他消息(第 2 条,第 3 条),处理subscribe
程序永远不会被调用。