我想要做的是开始一个重复的过程,每小时将一批消息发布到我们服务器上的 RMQ 交换。
我有一个类,我们称之为它RMQProcess
,它会触发一个 AMQP 事件循环。我认为我可以使用 rufus-scheduler 来做到这一点:
scheduler.every '10s', :times=>6 do
process = RMQProcess.new
process.start
end
scheduler.join
这有效......除了每次通过循环时,AMQP 通道都会增加(从 2 到 4 到 6 等......)。我认为这意味着通道没有正确关闭,这可能会出现问题。
我想总结一下我的问题,做这种事情的正确(或至少是正确)方法是什么?是否应该在进入调度程序进程之前启动 AMQP 进程,还是我这样做是正确的?我是否必须在 AMQP 事件循环中滚动我自己的调度逻辑?那是我的恐惧,因为似乎必须有更好的方法。任何建议表示赞赏。
作为参考,这里是 start 方法(在这种情况下,我只是使用 RandomText gem 发布无意义的句子):
def start
begin
puts @rmq_params
AMQP.start(@rmq_params) do
|connection|
connection.on_error do
|ch, connection_close|
puts "#{connection_close.reply_text}"
end
connection.on_tcp_connection_loss do
|conn, settings|
puts "[network failure] Trying to reconnect..."
conn.reconnect(false, 2)
end
channel = AMQP::Channel.new(connection, :auto_recovery => true)
puts "Channel ID = #{channel.id}"
exchange = channel.direct(@exchangeName,:durable => true)
exchange.publish(Lorem.words)
EM.add_timer(@duration) do
connection.close do
EM.stop_event_loop
end
end
Signal.trap("INT") do
connection.close do
EM.stop_event_loop
end
end
end
rescue Exception => e
puts "#{e.message} #{e.backtrace.join("\n")}"
end
end