1

我正在用 Ruby 开发一个系统,该系统将利用 RabbitMQ 在执行某些工作时将消息发送到队列。我在用:

我在这个 gem 上看到的大多数示例都在 EM.add_periodic_timer 块中进行了发布调用。这不适用于我怀疑的绝大多数用例,当然也不适用于我的用例。我需要在完成一些工作时发布一条消息,因此将发布语句放在 add_periodic_timer 块中是不够的。

所以,我试图弄清楚如何将一些消息发布到队列中,然后“刷新”它,以便我发布的任何消息随后都会传递给我的订阅者。

为了让您了解我的意思,请考虑以下发布者代码:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  queue = MQ.queue('testq')
  messages_published = 0

  while (messages_published < 50)

    if (rand() < 0.4)
      message = MESSAGES[rand(MESSAGES.size)]
      puts "#{Time.now.to_s}: Publishing: #{message}"
      queue.publish(message)
      messages_published += 1
    end

    sleep(0.1)

  end

  AMQP.stop do
    EM.stop
  end

end

所以,这段代码只是简单地循环,在循环的每次迭代中以 40% 的概率发布一条消息,然后休眠 0.1 秒。它会一直执行此操作,直到发布了 50 条消息,然后停止 AMQP。当然,这只是一个概念证明。

现在,我的订阅者代码:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

AMQP.start do

  queue = MQ.queue('testq') 
  queue.subscribe do |header, msg|
    puts "#{Time.now.to_s}: Received #{msg}"
  end

end

因此,我们只需订阅队列,并且对于收到的每条消息,我们都会将其打印出来。

很好,除了订阅者只在发布者调用 AMQP.stop 时接收到全部 50 条消息。

这是我的出版商的输出。为简洁起见,它在中间被截断:

$ ruby publisher.rb 
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye

接下来,我的订阅者的输出:

$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye

如果您注意到输出中的时间戳,则订阅者仅在发布者停止 AMQP 并退出后才会收到所有消息。

那么,作为一个 AMQP 新手,我怎样才能让我的消息立即传递呢?我尝试将 AMQP.start 和 AMQP.stop 放在发布者的 while 循环的主体中,但是只有第一条消息被传递 - 虽然奇怪的是,如果我打开日志记录,服务器没有报告错误消息并且消息确实被发送到队列,但永远不会被订阅者接收。

建议将不胜感激。谢谢阅读。

4

1 回答 1

2

有关此问题的任何信息,请参阅http://groups.google.com/group/ruby-amqp/browse_thread/thread/311965bcf9697ece

我通过在我的发布者代码中添加一个额外的线程解决了这个问题:

!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  Thread.new do
      queue = MQ.queue('testq')
      messages_published = 0

      while (messages_published < 50)

        if (rand() < 0.4)
          message = MESSAGES[rand(MESSAGES.size)]
          puts "#{Time.now.to_s}: Publishing: #{message}"
          queue.publish(message)
          messages_published += 1
        end

        sleep(0.1)

      end

      AMQP.stop do
        EM.stop
      end

  end

end
于 2010-04-15T02:04:13.210 回答