我无法弄清楚如何在本地订阅我的 RabbitMQ(当我使用 CloudAMPQ 时它起作用了。)我怀疑问题是我的 SendPriceJob 没有连接到正确的连接/通道/交换,但我不确定。
我有一个工作人员每分钟从 API 获取数据。
class FetchPriceJob
@queue = :update_price
def self.perform
# Do some stuff
FetchPriceJob.new.publish(response.to_json)
end
def publish(data)
channel.default_exchange.publish(data, routing_key: queue.name)
connection.close
end
def connection
@conn ||= begin
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
end
end
def channel
@channel ||= connection.create_channel
end
def queue
@queue ||= channel.queue('current_prices')
end
end
我有另一个工作人员打开连接并收听。
module SendPriceJob
@queue = :price_serve
def self.perform
conn = Bunny.new(host: "localhost", vhost: "/", user: "guest", password: "guest")
conn.start
ch = conn.create_channel
x = ch.default_exchange
q = ch.queue('current_prices')
begin
q.subscribe(block: true) do |_, _, body|
ActionCable.server.broadcast 'prices', body
end
rescue Interrupt => _
ch.close
conn.close
end
end
end
这两个工作人员由流程管理器启动。
# Procfile
elastic: elasticsearch
redis: redis-server
web: rails server -p 3000
send_worker: QUEUE=price_serve rake resque:work
fetch_worker: QUEUE=update_price rake resque:work
scheduler: rake resque:scheduler
我正在运行我的 RabbitMQ 服务器: http: //prntscr.com/i6rcwv。我成功地在连接/通道/交换上排队消息:http: //prntscr.com/i6rd7q。从我的日志看来,我正在运行的调度程序和生产者一样工作:http: //prntscr.com/i6rdxf。
这是我第一次合作,message queues
所以我可能做错了什么。我觉得我应该接近,因为它正在使用 CloudAMQP。唯一的区别是Bunny.new
配置为连接到外部 API。