2

我需要从 Rails 应用程序中收集一些数据,对其进行聚合,然后定期将其发送到远程服务器。我在 application.rb 中的全局变量(我知道,我知道)中实例化我的聚合类。

在我的聚合类中,我启动了一个休眠 10 秒的线程,然后查看队列、处理数据并发送它。队列是存储在类的实例变量中的散列。

从 rails 控制器,我调用聚合器类中的一个方法来对散列中的数据进行排队。当然,这与读取队列的后台任务在不同的线程上。问题是后台任务永远不会在哈希中看到任何数据。在我的日志中,当我写入(从控制器线程)和从它读取(从后台线程)时,我都会打印出散列的 object_id。两个线程的 hash#object_id 匹配,但后台线程永远不会看到数据。

让我丧命的是,这在 rails 之外可以正常工作。我已经设置了许多线程的测试,这些线程真的很重要,而且效果很好(为了清楚起见,我没有显示一些线程保护)。有谁知道's 怎么匹配,但内容不一致?object_id

class Aggregator

def initialize
  @q = {}
  @timer = nil
end

def start
  @timer = Thread.new do
    loop do
      sleep(10)
      flush_q
    end
  end
end

def flush_q
  logger.debug "flush: q.object_id = #{@q.object_id}"  # matches what I get below
  logger.debug "flush: q.length = #{@q.length}"   # always zero!
  @q.each_pair do |k,v|
    # pack it up and send it
  end
  @q.clear
end

def add(item)
  logger.debug "add: q.object_id = #{@q.object_id}"  # matches what I get above
  @q[item.name] ||= item
  logger.debug "add: q.length = #{@q.length}"   # increases with each add
  # not actually that simple, but not relevant
end

end
4

1 回答 1

0

我将冒险并假设您的代码是使用分叉应用服务器(例如独角兽或乘客)部署的。

这意味着您的应用程序加载一次,然后从该主实例派生新实例。分叉很便宜,因此这意味着应用程序的新实例可以非常快速地启动/关闭。

我相信您的聚合器实例正在此主进程中创建/启动。当这个 fork 进程的整个内存空间被复制时(所以在新进程中有一个聚合器实例,具有相同的对象 id 等等)。

然而,当只复制当前线程时,聚合器刷新只发生在主进程中,但所有附加都发生在子进程中。您可以通过添加到您的日志来确认这一点Proccess.pid- 您应该看到您的日志来自 2 个不同的进程。

解决此问题的一种方法是在子进程分叉后启动/重新启动线程。您如何执行此操作取决于应用程序的服务方式。使用 unicorn,您可以通过该after_fork方法在您的 unicorn 配置中执行此操作。和乘客一起做

PhusionPassenger.on_event(:starting_worker_process) do |forked|
  if forked
    ...
  end
end
于 2013-07-17T21:22:48.890 回答