我正在用 Ruby 开发一个系统,该系统将利用 RabbitMQ 在执行某些工作时将消息发送到队列。我在用:
- Ruby 1.9.1 稳定版
- 兔MQ 1.7.2
- AMQP gem v0.6.7 ( http://github.com/tmm1/amqp )
我在这个 gem 上看到的大多数示例都在 EM.add_periodic_timer 块中进行了发布调用。这不适用于我怀疑的绝大多数用例,当然也不适用于我的用例。我需要在完成一些工作时发布一条消息,因此将发布语句放在 add_periodic_timer 块中是不够的。
所以,我试图弄清楚如何将一些消息发布到队列中,然后“刷新”它,以便我发布的任何消息随后都会传递给我的订阅者。
为了让您了解我的意思,请考虑以下发布者代码:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
MESSAGES = ["hello","goodbye","test"]
AMQP.start do
queue = MQ.queue('testq')
messages_published = 0
while (messages_published < 50)
if (rand() < 0.4)
message = MESSAGES[rand(MESSAGES.size)]
puts "#{Time.now.to_s}: Publishing: #{message}"
queue.publish(message)
messages_published += 1
end
sleep(0.1)
end
AMQP.stop do
EM.stop
end
end
所以,这段代码只是简单地循环,在循环的每次迭代中以 40% 的概率发布一条消息,然后休眠 0.1 秒。它会一直执行此操作,直到发布了 50 条消息,然后停止 AMQP。当然,这只是一个概念证明。
现在,我的订阅者代码:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
AMQP.start do
queue = MQ.queue('testq')
queue.subscribe do |header, msg|
puts "#{Time.now.to_s}: Received #{msg}"
end
end
因此,我们只需订阅队列,并且对于收到的每条消息,我们都会将其打印出来。
很好,除了订阅者只在发布者调用 AMQP.stop 时接收到全部 50 条消息。
这是我的出版商的输出。为简洁起见,它在中间被截断:
$ ruby publisher.rb
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye
接下来,我的订阅者的输出:
$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
如果您注意到输出中的时间戳,则订阅者仅在发布者停止 AMQP 并退出后才会收到所有消息。
那么,作为一个 AMQP 新手,我怎样才能让我的消息立即传递呢?我尝试将 AMQP.start 和 AMQP.stop 放在发布者的 while 循环的主体中,但是只有第一条消息被传递 - 虽然奇怪的是,如果我打开日志记录,服务器没有报告错误消息并且消息确实被发送到队列,但永远不会被订阅者接收。
建议将不胜感激。谢谢阅读。