0

使用 Ruby amqp 库的 v0.7.1 和 Ruby 1.8.7,我试图将大量(数百万)短(约 40 字节)消息发布到 RabbitMQ 服务器。我的程序的主循环(嗯,不是真正的循环,但仍然)看起来像这样:

AMQP.start(:host => '1.2.3.4', 
       :username => 'foo',
       :password => 'bar') do |connection|

  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("foobar", {:durable => true})
  i = 0

  EM.add_periodic_timer(1) do
    print "\rPublished #{i} commits"
  end

  results = get_results # <- Returns an array 

  processor = proc do
    if x = results.shift then
        exchange.publish(x, :persistent => true, 
                         :routing_key => "test.#{i}")
        i += 1
        EM.next_tick processor
      end
  end
  EM.next_tick(processor)
  AMQP.stop {EM.stop} end

代码开始处理结果数组就好了,但过了一段时间(通常,在 12k 条消息之后)它会因以下错误而死

/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send': 
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)

队列中不存储任何消息。该错误似乎只是在从程序到队列服务器的网络活动开始时发生。

我究竟做错了什么?

4

1 回答 1

-1

第一个错误是您没有发布您正在使用的 RabbitMQ 版本。很多人都在运行旧的过时版本 1.7.2,因为那是他们的操作系统包存储库中的内容。对于发送大量消息的人来说,这是一个糟糕的举动。从 RabbitMQ 站点本身获取 RabbitMQ 2.5.1 并摆脱您的默认系统包。

第二个错误是您没有告诉我们 RabbitMQ 日志中的内容。

第三个错误是你没有说什么在消费这些消息。是否有另一个进程在某个地方运行,它声明了一个队列并将其绑定到交换器。没有消息队列,除非有人向 RabbitMQ 声明并将其绑定到交换器即使这样,只有当队列的绑定键与您发布的路由键匹配时,消息才会流动。

第四个错误。您混合了路由键和绑定键。路由键是一个字符串,例如 topic.test.json.echos,而绑定键(用于将队列绑定到交换器)是一个类似于 topic.# 或 topic 的模式。.json。

在您澄清后更新 关于版本,我不确定它何时修复但在 1.7.2 中存在问题,大量持久性消息导致 RabbitMQ 在其持久性日志滚动时崩溃,并且在崩溃后它无法重新启动,直到有人手动取消翻转。

当您说正在打开和关闭连接时,我希望它不是每条消息。那将是一种使用 AMQP 的奇怪方式。

让我重复一遍。生产者不会将消息写入队列。他们将消息写入交换器,然后交换器根据路由键(字符串)和队列的绑定键(模式)将消息路由到队列。在您的示例中,我误读了 # 符号的使用,但我看不到任何声明队列并将其绑定到交换机的内容。

于 2011-08-20T06:31:56.560 回答