2

我有一个 ruby​​ 进程,它使用 amqp gem 消耗来自 RabbitMQ 队列的消息,如下所示:

require "bundler/setup"
require "amqp"
require 'eventmachine'
require 'em-http'


AMQP.start(:host => $AMQP_URL) do |connection|
  @channel ||= AMQP::Channel.new(connection)
  @queue   ||= @channel.queue("results")

  puts " [*] Waiting for messages. "

  @queue.subscribe do |body|    
    http = EventMachine::HttpRequest.new(URL).post :body => body          

    http.callback {
      # do something
    }        

    http.errback  { 
      $LOG.error "[errorback] -> #{http.error}"    
    }
  end
end

现在 URL 很慢,并且队列有很多消息(> 30K),我在日志中收到了这个错误:

**[errorback] -> unable to create new socket: Too many open files** 

任何帮助都将受到高度赞赏,因为我一直在尝试找出如何解决它但根本没有结果。

提前致谢

4

1 回答 1

5

你消费消息的速度太快了。由于您基本上一次获取所有未传递的消息(即 RabbitMQ 可以传递它们的速度),并为每条消息打开一个 HTTP 连接,因此您最终会消耗系统的所有可用资源(在这种情况下是并行打开套接字的数量) .

在阅读了有关消息确认AMQP::Queue#subscribe) 的文档后,我建议对您的代码进行以下更改:

AMQP.start(host: $AMQP_URL) do |connection|
  @channel ||= AMQP::Channel.new(connection)
  @channel.prefetch(5) 
  @queue ||= @channel.queue("results")

  # disable auto-ack, switch to manual mode
  @queue.subscribe(ack: true) do |meta, body|
    http = EventMachine::HttpRequest.new(URL).post body: body          

    http.callback {
      # do something

      # acknowledge message consumption
      meta.ack
    }

    http.errback {
      # ...
      # do not `meta.ack` here, so the message gets redelivered
    }
  end
end

这样,您可以很容易地限制您的工作量。

于 2013-04-27T13:03:54.047 回答