0

我无法弄清楚如何在本地订阅我的 RabbitMQ(当我使用 CloudAMPQ 时它起作用了。)我怀疑问题是我的 SendPriceJob 没有连接到正确的连接/通道/交换,但我不确定。

我有一个工作人员每分钟从 API 获取数据。

class FetchPriceJob
  @queue = :update_price

  def self.perform
    # Do some stuff

    FetchPriceJob.new.publish(response.to_json)
  end

  def publish(data)
    channel.default_exchange.publish(data, routing_key: queue.name)
    connection.close
  end

  def connection
    @conn ||= begin
      conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
      conn.start
    end
  end

  def channel
    @channel ||= connection.create_channel
  end

  def queue
    @queue ||= channel.queue('current_prices')
  end
end

我有另一个工作人员打开连接并收听。

module SendPriceJob
  @queue = :price_serve

  def self.perform
    conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
    conn.start
    ch  = conn.create_channel
    x   = ch.default_exchange
    q   = ch.queue('current_prices')
    begin
      q.subscribe(block: true) do |_, _, body|
        ActionCable.server.broadcast 'prices', body
      end
    rescue Interrupt => _
      ch.close
      conn.close
    end
  end
end

这两个工作人员由流程管理器启动。

# Procfile
elastic: elasticsearch
redis: redis-server
web: rails server -p 3000
send_worker: QUEUE=price_serve rake resque:work
fetch_worker: QUEUE=update_price rake resque:work
scheduler: rake resque:scheduler

我正在运行我的 RabbitMQ 服务器: http: //prntscr.com/i6rcwv。我成功地在连接/通道/交换上排队消息:http: //prntscr.com/i6rd7q。从我的日志看来,我正在运行的调度程序和生产者一样工作:http: //prntscr.com/i6rdxf

这是我第一次合作,message queues所以我可能做错了什么。我觉得我应该接近,因为它正在使用 CloudAMQP。唯一的区别是Bunny.new配置为连接到外部 API。

4

2 回答 2

2

您可以使用sneakerspwwka gem 对 RabbitMQ 消息进行后台处理。为您节省了订阅消息队列所涉及的大量瓶颈。

于 2018-01-28T19:02:51.687 回答
1

在这种方法中:

def publish(data)
  channel.default_exchange.publish(data, routing_key: queue.name)
  connection.close
end

你可以试试这个:

queue.publish(data, routing_key: queue.name)

文档:http ://rubybunny.info/articles/exchanges.html#default_exchange

于 2018-01-28T18:08:17.227 回答