0

我有一个有 1 名工人的队列。暗示。看起来像这样:

def work
    connection = get_connection
    connection.start
    channel = connection.create_channel
    queue   = channel.queue("crawl", :durable => true)

    multi_log " [*] Worker waiting for messages in #{queue.name}. To exit press CTRL+C"

    begin
      queue.subscribe(:ack => true, :block => true) do |delivery_info, properties, message|
        multi_log " [x] Worker received #{message}"
        process_work message
        channel.ack(delivery_info.delivery_tag)
        multi_log " [x] Worker job done for #{message}"
      end
    rescue => e
      log.error e.message
      log.error e.backtrace.join("\n")
      connection.close
    end
  end

只要作业执行,消费者就会阻塞。据我了解,该消费者一次处理一项工作。但让我感到困惑的是,在 RabbitMQ 管理 UI 中,我有时会看到该队列有 2 或 3 个 Unacked。但这怎么可能?

4

2 回答 2

0

我昨天晚上想通了:) 问题是预取没有明确设置。如果您设置channel.prefetch(1),那么消费者一次将只接收 1 条消息。如果未设置预取值,则消费者会尽可能多地接收消息。所以完整的代码现在看起来像这样。

  def work
    connection = get_connection
    connection.start
    channel = connection.create_channel
    channel.prefetch(1)
    queue   = channel.queue("crawl", :durable => true)

    multi_log " [*] Worker waiting for messages in #{queue.name}. To exit press CTRL+C"

    begin
      queue.subscribe(:ack => true, :block => true) do |delivery_info, properties, message|
        multi_log " [x] Worker received #{message}"
        process_work message
        channel.ack(delivery_info.delivery_tag)
        multi_log " [x] Worker job done for #{message}"
      end
    rescue => e
      log.error e.message
      log.error e.backtrace.join("\n")
      connection.close
    end
  end

现在 RabbitMQ 管理 UI 显示例如有 9 条消息准备就绪,1 条未确认,总共 10 条。这就像预期的那样。

顺便说一句,在 Java 中,您可以像这样设置预取channel.basicQos(1);

于 2016-03-05T08:26:35.787 回答
0

您的解决方案的发布方正在填满队列(间接通过交换 - 因为您没有提供详细信息,所以无法说明如何)。订阅者(工作者)的数量与未确认消息的数量无关。在您的情况下似乎很简单:发布者比订阅者快。

于 2016-03-04T18:48:43.267 回答