0

为什么我的 RabbitMQ 消息在 ACKd 后被重新传递给我的消费者?我是 RabbitMQ 的新手;我一定是在滥用它,或者 Ruby amqp gem 可能有问题。

我有一个订阅队列并确认每条消息的 ruby​​ 脚本。如果我让它一直通过消息,消息确实会从队列中消失;他们没有重新交付。但是,如果我在脚本全部确认之前中断我的脚本,然后再次启动脚本,则从第一条消息重新开始传递。

我在代码中看到的行为由 RabbitMQ Web 管理界面准确反映;队列有消息,尽管有 ACK,但它们并没有消失。

线索:我在队列中放入了大约 5000 条消息。如果我让消费者​​大量确认,则实际上确实似乎从队列中删除了一些消息(与我上面所说的相反)。我一直无法确定这种现象。

我正在使用 ruby​​ 1.9.3、RabbitMQ 2.8.7 和 amqp ruby​​ gem 0.9.8。它发生在 Ubuntu 12.0.4 或 Mac OS 10.7.4 上的生产者和消费者身上。

有没有搞错??

(请参阅此消息末尾的更新)

这是消费者的代码:

# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'

queue_name = "some.queue"
begin
  AMQP.start("amqp://localhost:5672") do | connection |
    channel  = AMQP::Channel.new(connection)
    queue = channel.queue(queue_name, :durable => true)
    queue.subscribe(:ack => true) do | metadata, payload |
      metadata.ack
    end
  end
end

这是制作人:

# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'

msg = ARGV[0]
queue_name = "some.queue"
begin
  AMQP.start("amqp://localhost:5672") do | connection |
    channel  = AMQP::Channel.new(connection)
    queue    = channel.queue(queue_name, :durable => true)
    (1..5000).each do | x |
      channel.default_exchange.publish x, :routing_key => queue_name, :persistent => true
    end
  end
end

使用 Wireshark,我确定我发送的确认没有发送给代理。我调用了 metadata.ack,但没有发送任何数据包。

根据@Robthewolf 的建议,我尝试了 channel.prefetch(1)。当我使用那个调用时,每个确认都会发送给经纪人。一般来说,如果我调用了 channel.prefetch(n),那么一旦我发送了 n(或者有时是 n+1)个确认,它就会将它们发送给代理。

所以我有一个新问题:为什么 prefetch() 参数决定了在最终发送到代理之前必须发送多少个 ack?

4

1 回答 1

1

如果您的消费者停止,所有未确认的消息都将返回到队列中。如果只有少数人被确认,他们将不会被返回到队列中,但其他人会被返回。您可以使用channel.basicQos(1);它来确保您一次只能从队列中读取 1 个项目。在第一个项目被确认之前,不会读取任何新项目。

于 2012-11-07T05:52:24.083 回答