0

我想要做的是开始一个重复的过程,每小时将一批消息发布到我们服务器上的 RMQ 交换。

我有一个类,我们称之为它RMQProcess,它会触发一个 AMQP 事件循环。我认为我可以使用 rufus-scheduler 来做到这一点:

scheduler.every '10s', :times=>6 do
  process = RMQProcess.new
  process.start
end

scheduler.join

这有效......除了每次通过循环时,AMQP 通道都会增加(从 2 到 4 到 6 等......)。我认为这意味着通道没有正确关闭,这可能会出现问题。

我想总结一下我的问题,做这种事情的正确(或至少是正确)方法是什么?是否应该在进入调度程序进程之前启动 AMQP 进程,还是我这样做是正确的?我是否必须在 AMQP 事件循环中滚动我自己的调度逻辑?那是我的恐惧,因为似乎必须有更好的方法。任何建议表示赞赏。

作为参考,这里是 start 方法(在这种情况下,我只是使用 RandomText gem 发布无意义的句子):

def start
    begin
      puts @rmq_params
      AMQP.start(@rmq_params) do 
        |connection|

        connection.on_error do
          |ch, connection_close|
          puts "#{connection_close.reply_text}"
        end

        connection.on_tcp_connection_loss do
          |conn, settings|
          puts "[network failure] Trying to reconnect..."
          conn.reconnect(false, 2)
        end

        channel = AMQP::Channel.new(connection, :auto_recovery => true)
        puts "Channel ID = #{channel.id}"
        exchange = channel.direct(@exchangeName,:durable => true)
        exchange.publish(Lorem.words)

        EM.add_timer(@duration) do
            connection.close do
                EM.stop_event_loop
            end
        end


        Signal.trap("INT") do
          connection.close do
            EM.stop_event_loop
          end
        end

      end
    rescue Exception => e
      puts "#{e.message} #{e.backtrace.join("\n")}"
    end
end
4

1 回答 1

0

6.times do
  process = RMQProcess.new
  process.start
  sleep(10)
end

表现不同?

如果我是你,我会在 RMQProcess#start 之外“启动”AMQP 并重新使用生成的连接。

于 2014-04-06T21:57:06.517 回答