使用 Ruby amqp 库的 v0.7.1 和 Ruby 1.8.7,我试图将大量(数百万)短(约 40 字节)消息发布到 RabbitMQ 服务器。我的程序的主循环(嗯,不是真正的循环,但仍然)看起来像这样:
AMQP.start(:host => '1.2.3.4',
:username => 'foo',
:password => 'bar') do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("foobar", {:durable => true})
i = 0
EM.add_periodic_timer(1) do
print "\rPublished #{i} commits"
end
results = get_results # <- Returns an array
processor = proc do
if x = results.shift then
exchange.publish(x, :persistent => true,
:routing_key => "test.#{i}")
i += 1
EM.next_tick processor
end
end
EM.next_tick(processor)
AMQP.stop {EM.stop} end
代码开始处理结果数组就好了,但过了一段时间(通常,在 12k 条消息之后)它会因以下错误而死
/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send':
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)
队列中不存储任何消息。该错误似乎只是在从程序到队列服务器的网络活动开始时发生。
我究竟做错了什么?