我有一个运行 ruby 2.4.4 的 Rails 应用程序,使用 Unicorn 作为 Web 服务器,它使用单例在后台线程中从 Kafka 读取。这个想法是每个独角兽进程都有一个单例实例。所以 4 个进程,4 个单例。
after_fork
我在我的独角兽配置中启动了钩子内的卡夫卡消费。我可以成功等待历史消息的消费完成(通过立即进行撬动来验证)。
但是,当我到达服务流量的地步时,单例实例是 a) 一个不同的实例,并且 b) 为空 - 先前设置的 ivar 已消失。
我已经确认我在同一个进程和同一个线程中。
设置如下:
# background_foo_consumer.rb
class BackgroundFooConsumer
include Singleton
attr_reader :background_consumer
def add_background_consumer(consumer, topics, options: nil)
@background_consumer ||= BackgroundKafkaConsumer.new(consumer, topics, options: options)
end
def processed_historical_messages?
background_consumer&.consumer&.reached_head
end
end
# config/unicorn.rb
after_worker_ready do |server, worker|
BackgroundFooConsumer.instance.add_background_consumer(nil, ["foos"])
BackgroundFooConsumer.instance.background_consumer.start
BackgroundFooConsumer.instance.background_consumer.consumer.mutex.synchronize {
BackgroundFooConsumer
.instance.background_consumer.consumer.processed_historical_messages.wait(
BackgroundFooConsumer.instance.background_consumer.consumer.mutex
)
}
end
end
我确认我在同一个进程中,甚至在同一个线程中,因为我可以通过替换include Singleton
自定义实现和线程局部变量成功地将正确的对象传递给应用程序,如下所示:
# config/unicorn.rb
after_worker_ready do |server, worker|
# ... same as above
Thread.current[:background_foo_consumer] = BackgroundFooConsumer.instance
end
# background_foo_consumer.rb
class BackgroundFooConsumer
attr_reader :background_consumer
def self.instance
@instance ||= begin
Thread.current[:background_foo_consumer] || self.new
ensure
Thread.current[:background_foo_consumer] = nil
end
end
end
在这个实现中,当我从我的应用程序提供流量时,BackgroundFooConsumer.instance
是在钩子中创建的正确实例after_fork
,并且每个独角兽进程都有一个独立的实例,通过检查对象 ID 来确认。
我不相信这是 GC,至少底层对象没有被清除,我已经通过在 after_fork 钩子中设置 Thread 局部变量来确认这一点,然后include Singleton
在我的消费者类中使用。我仍然得到空/新单例,但是如果我直接查询它,线程局部变量仍然存在。
我目前的假设是,这与写入时复制有关,通过设置线程局部变量,我以某种方式强制 ruby 仅为该进程创建一个单例并将其保存到该变量中。
所以我的问题是单例实例如何在单个线程中像这样消失?我怎样才能阻止它发生?如果可以的话,我宁愿不使用这些线程局部变量。