为什么我的 RabbitMQ 消息在 ACKd 后被重新传递给我的消费者?我是 RabbitMQ 的新手;我一定是在滥用它,或者 Ruby amqp gem 可能有问题。
我有一个订阅队列并确认每条消息的 ruby 脚本。如果我让它一直通过消息,消息确实会从队列中消失;他们没有重新交付。但是,如果我在脚本全部确认之前中断我的脚本,然后再次启动脚本,则从第一条消息重新开始传递。
我在代码中看到的行为由 RabbitMQ Web 管理界面准确反映;队列有消息,尽管有 ACK,但它们并没有消失。
线索:我在队列中放入了大约 5000 条消息。如果我让消费者大量确认,则实际上确实似乎从队列中删除了一些消息(与我上面所说的相反)。我一直无法确定这种现象。
我正在使用 ruby 1.9.3、RabbitMQ 2.8.7 和 amqp ruby gem 0.9.8。它发生在 Ubuntu 12.0.4 或 Mac OS 10.7.4 上的生产者和消费者身上。
有没有搞错??
(请参阅此消息末尾的更新)
这是消费者的代码:
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel = AMQP::Channel.new(connection)
queue = channel.queue(queue_name, :durable => true)
queue.subscribe(:ack => true) do | metadata, payload |
metadata.ack
end
end
end
这是制作人:
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
msg = ARGV[0]
queue_name = "some.queue"
begin
AMQP.start("amqp://localhost:5672") do | connection |
channel = AMQP::Channel.new(connection)
queue = channel.queue(queue_name, :durable => true)
(1..5000).each do | x |
channel.default_exchange.publish x, :routing_key => queue_name, :persistent => true
end
end
end
使用 Wireshark,我确定我发送的确认没有发送给代理。我调用了 metadata.ack,但没有发送任何数据包。
根据@Robthewolf 的建议,我尝试了 channel.prefetch(1)。当我使用那个调用时,每个确认都会发送给经纪人。一般来说,如果我调用了 channel.prefetch(n),那么一旦我发送了 n(或者有时是 n+1)个确认,它就会将它们发送给代理。
所以我有一个新问题:为什么 prefetch() 参数决定了在最终发送到代理之前必须发送多少个 ack?