TLDR;
为什么 Sneakers worker 连接不上数据库或者查询不到?
(也欢迎在评论中提供关于“做”和“不做”的一般建议)
完整问题:
我能够执行返回简单字符串的 RPC 调用,但无法执行在服务器端查询数据库的 RPC 调用。我阅读了文档,尝试了许多 SO 帖子和博客教程,但我仍然缺少一些内容。
我有两个服务。第一个服务 ( Client ) 正在使用Bunny gem 并正在对第二个服务 ( RPCServer ) 进行 RPC 调用,该服务正在使用Sneakers gem监听工作人员。这两项服务都是 Rails 应用程序。
RabbitMQ在 docker 容器中提供服务:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Postgres数据库安装在本地机器上。
客户服务(主要来自Rabbitbunny 文档):
# app/services/client.rb
class Client
attr_accessor :call_id, :lock, :condition, :reply_queue, :exchange, :params, :response, :server_queue_name, :channel, :reply_queue_name
def initialize(rpc_route:, params:)
@channel = channel
@exchange = channel.fanout("Client.Server.exchange.#{params[:controller]}")
@server_queue_name = "Server.Client.queue.#{rpc_route}"
@reply_queue_name = "Client.Server.queue.#{params[:controller]}"
@params = params
setup_reply_queue
end
def setup_reply_queue
@lock = Mutex.new
@condition = ConditionVariable.new
that = self
@reply_queue = channel.queue(reply_queue_name, durable: true)
reply_queue.subscribe do |_delivery_info, properties, payload|
if properties[:correlation_id] == that.call_id
that.response = payload
that.lock.synchronize { that.condition.signal }
end
end
end
def call
@call_id = "NAIVE_RAND_#{rand}#{rand}#{rand}"
exchange.publish(params.to_json,
routing_key: server_queue_name,
correlation_id: call_id,
reply_to: reply_queue.name)
lock.synchronize { condition.wait(lock) }
connection.close
response
end
def channel
@channel ||= connection.create_channel
end
def connection
@connection ||= Bunny.new.tap { |c| c.start }
end
end
RPCServer 服务,使用这个要点(这里的评论是我问题的“肉”:
# app/workers/posts_worker.rb
require 'sneakers'
require 'sneakers/runner'
require 'byebug'
require 'oj'
class RpcServer
include Sneakers::Worker
from_queue 'Client.Server.queue.v1/filters/posts', durable: true, env: nil
def work_with_params(deserialized_msg, delivery_info, metadata)
post = {}
p "ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}" # => true
##### This gets logged
Rails.logger.info "ActiveRecord::Base.connection_pool: #{ActiveRecord::Base.connection_pool}\n\n-------"
##### This never gets logged
Rails.logger.info "ActiveRecord::Base.connection_pool.with_connection: #{ActiveRecord::Base.connection_pool.with_connection}\n\n--------"
### interpreter never reaches this place when ActiveRecord methods like `with_connection`, `where`, `count` etc. are used
ActiveRecord::Base.connection_pool.with_connection do
post = Post.first.to_json
end
##### first commented `publish()` works fine and RPC works when no ActiveRecord is involved (this is, assuming above code using ActiveRecord is commented out)
##### second publish is not working
# publish("response from RPCServer", {
publish(post.to_json, {
to_queue: metadata[:reply_to],
correlation_id: metadata[:correlation_id],
content_type: metadata[:content_type]
})
ack!
end
end
Sneakers::Runner.new([RpcServer]).run
RPCServer运动鞋配置:
# config/initializers/sneakers.rb
Sneakers.configure({
amqp: "amqp://guest:guest@localhost:5672",
vhost: '/',
workers: 4,
log: 'log/sneakers.log',
pid_path: "tmp/pids/sneakers.pid",
timeout_job_after: 5,
prefetch: 10,
threads: 10,
durable: true,
ack: true,
heartbeat: 2,
exchange: "",
hooks: {
before_fork: -> {
Rails.logger.info('Worker: Disconnect from the database')
ActiveRecord::Base.connection_pool.disconnect!
Rails.logger.info("before_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => false
},
after_fork: -> {
ActiveRecord::Base.connection
Rails.logger.info("after_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => true
Rails.logger.info('Worker: Reconnect to the database')
},
timeout_job_after: 60
})
Sneakers.logger.level = Logger::INFO
RPCServer puma 配置:
# config/puma.rb
threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 }
threads threads_count, threads_count
port ENV.fetch("PORT") { 3000 }
environment ENV.fetch("RAILS_ENV") { "development" }
workers ENV.fetch("WEB_CONCURRENCY") { 2 }
preload_app!
### tried and did not work
# on_worker_boot do
# ActiveSupport.on_load(:active_record) do
# ActiveRecord::Base.establish_connection
# end
# end
before_fork do |server, worker|
# other settings
if defined?(ActiveRecord::Base)
ActiveRecord::Base.connection.disconnect!
end
end
after_worker_boot do |server, worker|
if defined?(ActiveRecord::Base)
ActiveRecord::Base.establish_connection
end
end
plugin :tmp_restart
为了完整起见,我还有一个外部 Rakefile 将队列绑定到交换(在这种情况下可能不重要)
namespace :rabbitmq do
desc "Setup routing"
task :setup do
conn = start_bunny
rpc_route service: :blog, from: 'v1/filters/posts_mappings', to: 'v1/filters/posts'
conn.close
end
def rpc_route(service:, from:, to:)
...
end
def start_bunny
...
end
end
我尝试了许多球鞋配置,以及许多启动rabbitmq、重置它、删除队列、连接等的命令。所有这些都很难在这里列出,可能不是这样。
为什么我无法连接到数据库或执行 ActiveRecord 方法?我错过了什么?