2

我有一个 Rails 应用程序,有时使用 gem“Bunny”将消息发布到 RabbitMQ 队列。这是设置:

# config/initializers/bunny.rb
$mq_connection = Bunny.new
$mq_connection.start
$mq_channel = $mq_connection.create_channel

然后我可以在应用程序中的任何地方调用:

exchange = $mq_channel.default_exchange
exchange.publish(msg.to_json, persistent: true, routing_key: '...')

如果我从应用程序或控制台调用它,这很好用,但如果从 DelayedJob 作业调用它就不起作用。没有引发异常,但只是没有发送消息。

尝试单例:

看起来像$mq_channelDelayedJob 找不到全局变量,所以我创建了一个单例模型来存储它:

class RabbitMq
  include Singleton

  attr_accessor :connection, :channel

  def exchange
    channel.default_exchange
  end

  def setup
    self.connection = Bunny.new
    self.connection.start
    self.channel = connection.create_channel
  end

end

我从我的初始化程序调用设置:

# config/initializers/bunny.rb
RabbitMq.instance.setup

但这也行不通。作业终止且没有错误,但没有发布任何内容。

知道怎么做吗?从 DJ 之类的后台工作人员向 RabbitMQ 发布消息应该很常见。

4

1 回答 1

3

这是我的做法:

class Messaging::Publisher

  class << self

    def publish(message)
      new(message).publish
    end

  end # Class Methods

  #=========================================================================
  # Instance Methods      
  #=========================================================================

    def initialize(message)
      @message = message
    end

    def publish
      connection = Bunny.new(ENV['CLOUDAMQP_URL'])
      connection.start
      channel = connection.create_channel
      queue_name = "#{ENV['app_name']}.#{message.keys.first.to_s.pluralize}_queue"
      queue = channel.queue(queue_name, durable: true)
      channel.default_exchange.publish(message.to_json, :routing_key => queue.name)
      channel.close
      connection.stop
      true
    end

  private

    def message()   @message    end

end

我从我的应用程序(同步)和后台作业(异步)中调用它。像这样的东西:

class ServiceRequests::CreateManager < ServiceRequests::ManagerBase

  class << self

  private

  end # Class Methods

  #=========================================================================
  # Instance Methods
  #=========================================================================

    def manage
      Messaging::Publisher.publish service_request_message
    end

  private

    def service_request_message
      {
        service_request: {
          provider: {
            name: "Foo::Bar"
          },
          params: {
            baz: 'qux'
          }
        }
      }
    end

end
于 2017-05-19T14:12:55.913 回答