我正在为我的 Ruby on Rails 应用程序使用 AMQP / RabbitMQ。
我将以下amqp.rb
文件放在config/initializers
:(从配方复制并更改:http ://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs )
require 'amqp'
# References:
# 1. Getting Started with AMQP and Ruby
# http://rubyamqp.info/articles/getting_started/
# 2. EventMachine and Rails
# http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
# 3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
# http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
def self.start
if defined?(PhusionPassenger)
Rails.logger.info "###############################################################################"
Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
Rails.logger.info "###############################################################################"
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# => for passenger, we need to avoid orphaned threads
if forked && EventMachine.reactor_running?
EventMachine.stop
end
spawn_eventmachine_thread
die_gracefully_on_signal
end
else
Rails.logger.info "###############################################################################"
Rails.logger.info "PhusionPassenger is not running. Probably you are running Rails locally ......"
Rails.logger.info "###############################################################################"
# faciliates debugging
Thread.abort_on_exception = true
# just spawn a thread and start it up
spawn_eventmachine_thread unless defined?(Thin)
# Thin is built on EventMachine, doesn't need this thread
end
end
def self.spawn_eventmachine_thread
Thread.new {
EventMachine.run do
AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST ))
AMQP.channel.on_error(&method(:handle_channel_exception))
AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
.subscribe(&method(:handle_sending))
end
}
end
def self.handle_sending(metadata, payload)
puts "handle_sending: ######################"
Rails.logger.info "Received a message: #{payload}, content_type = #{metadata.content_type}"
Rails.logger.info "Sending to the Mixpanel server ......"
# In a worker process on this/another machine
Rails.logger.info "Sent the message to the Mixpanel server ......"
end
def self.handle_channel_exception(channel, channel_close)
Rails.logger.error "###############################################################################"
Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
Rails.logger.error "###############################################################################"
end
def self.die_gracefully_on_signal
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
Signal.trap("INT") { EventMachine.stop }
Signal.trap("TERM") { EventMachine.stop }
Rails.logger.error "###############################################################################"
end
end
AppEventMachine.start
订阅处理程序handle_sending
需要很长时间(数分钟或超过 10 分钟)来响应。
这是正常的吗?那么我需要在 AMQP 或 RabbitMQ 中进行任何配置设置吗?
谢谢。
编辑
如果我在控制台上执行此页面 ( http://rubyamqp.info/articles/getting_started/ )的“Hello World”示例
rails c
,它会非常快。我已经设置了
ulimit -n 1024
如果我
next_tick
喜欢下面的内容rails c
,它会非常快:
=========
require "rubygems"
require "amqp"
EventMachine.run do
EventMachine.next_tick {
puts "next_tick triggerred"
}
end
=========