2

TLDR;

为什么 Sneakers worker 连接不上数据库或者查询不到?

(也欢迎在评论中提供关于“做”和“不做”的一般建议)

完整问题:

我能够执行返回简单字符串的 RPC 调用,但无法执行在服务器端查询数据库的 RPC 调用。我阅读了文档,尝试了许多 SO 帖子和博客教程,但我仍然缺少一些内容。

我有两个服务。第一个服务 ( Client ) 正在使用Bunny gem 并正在对第二个服务 ( RPCServer ) 进行 RPC 调用,该服务正在使用Sneakers gem监听工作人员。这两项服务都是 Rails 应用程序。

RabbitMQ在 docker 容器中提供服务:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Postgres数据库安装在本地机器上。

客户服务(主要来自Rabbitbunny 文档):

# app/services/client.rb
class Client
  attr_accessor :call_id, :lock, :condition, :reply_queue, :exchange, :params, :response, :server_queue_name, :channel, :reply_queue_name

  def initialize(rpc_route:, params:)
    @channel = channel
    @exchange = channel.fanout("Client.Server.exchange.#{params[:controller]}")
    @server_queue_name = "Server.Client.queue.#{rpc_route}"
    @reply_queue_name = "Client.Server.queue.#{params[:controller]}"
    @params = params
    setup_reply_queue
  end

  def setup_reply_queue
    @lock = Mutex.new
    @condition = ConditionVariable.new
    that = self
    @reply_queue = channel.queue(reply_queue_name, durable: true)

    reply_queue.subscribe do |_delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.response = payload
        that.lock.synchronize { that.condition.signal }
      end
    end
  end

  def call
    @call_id = "NAIVE_RAND_#{rand}#{rand}#{rand}"
    exchange.publish(params.to_json,
                     routing_key: server_queue_name,
                     correlation_id: call_id,
                     reply_to: reply_queue.name)
    lock.synchronize { condition.wait(lock) }
    connection.close
    response
  end

  def channel
    @channel ||= connection.create_channel
  end

  def connection
    @connection ||= Bunny.new.tap { |c| c.start }
  end
end

RPCServer 服务,使用这个要点这里的评论是我问题的“肉”

# app/workers/posts_worker.rb
require 'sneakers'
require 'sneakers/runner'
require 'byebug'
require 'oj'

class RpcServer
  include Sneakers::Worker
  from_queue 'Client.Server.queue.v1/filters/posts', durable: true, env: nil

  def work_with_params(deserialized_msg, delivery_info, metadata)
    post = {}
    p "ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}" # => true
    ##### This gets logged   
    Rails.logger.info "ActiveRecord::Base.connection_pool: #{ActiveRecord::Base.connection_pool}\n\n-------"
    ##### This never gets logged
    Rails.logger.info "ActiveRecord::Base.connection_pool.with_connection: #{ActiveRecord::Base.connection_pool.with_connection}\n\n--------" 

    ### interpreter never reaches this place when ActiveRecord methods like `with_connection`, `where`, `count` etc. are used
    ActiveRecord::Base.connection_pool.with_connection do
      post = Post.first.to_json
    end

    ##### first commented `publish()` works fine and RPC works when no ActiveRecord is involved (this is, assuming above code using ActiveRecord is commented out)
    ##### second publish is not working
    # publish("response from RPCServer", {
    publish(post.to_json, {      
      to_queue: metadata[:reply_to],
      correlation_id: metadata[:correlation_id],
      content_type: metadata[:content_type]
    })

    ack!
  end
end

Sneakers::Runner.new([RpcServer]).run

RPCServer运动鞋配置:

# config/initializers/sneakers.rb
Sneakers.configure({
  amqp: "amqp://guest:guest@localhost:5672",
  vhost: '/',
  workers: 4,
  log: 'log/sneakers.log',
  pid_path: "tmp/pids/sneakers.pid",
  timeout_job_after: 5,
  prefetch: 10,
  threads: 10,
  durable: true,
  ack: true,
  heartbeat: 2,
  exchange: "",
  hooks: {
    before_fork: -> {
       Rails.logger.info('Worker: Disconnect from the database')
       ActiveRecord::Base.connection_pool.disconnect!
       Rails.logger.info("before_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => false
    },
    after_fork: -> {
      ActiveRecord::Base.connection
      Rails.logger.info("after_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => true
      Rails.logger.info('Worker: Reconnect to the database')
    },
    timeout_job_after: 60
})
Sneakers.logger.level = Logger::INFO

RPCServer puma 配置:

# config/puma.rb
threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 }
threads threads_count, threads_count

port        ENV.fetch("PORT") { 3000 }
environment ENV.fetch("RAILS_ENV") { "development" }
workers ENV.fetch("WEB_CONCURRENCY") { 2 }

preload_app!

### tried and did not work
# on_worker_boot do
#   ActiveSupport.on_load(:active_record) do
#     ActiveRecord::Base.establish_connection
#   end
# end

before_fork do |server, worker|
  # other settings
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.connection.disconnect!
  end
end

after_worker_boot do |server, worker|
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.establish_connection
  end
end

plugin :tmp_restart

为了完整起见,我还有一个外部 Rakefile 将队列绑定到交换(在这种情况下可能不重要)

namespace :rabbitmq do
  desc "Setup routing"
  task :setup do
    conn = start_bunny

    rpc_route service: :blog, from: 'v1/filters/posts_mappings', to: 'v1/filters/posts'

    conn.close
  end

  def rpc_route(service:, from:, to:)
    ...
  end

  def start_bunny
    ...
  end
end

我尝试了许多球鞋配置,以及许多启动rabbitmq、重置它、删除队列、连接等的命令。所有这些都很难在这里列出,可能不是这样。

为什么我无法连接到数据库或执行 ActiveRecord 方法?我错过了什么?

4

1 回答 1

1

好,我知道了。问题是 RPCServer 中的最后一行工作人员:

Sneakers::Runner.new([RpcServer]).run

它在 Rails 应用程序之外运行 worker。对此发表评论解决了我的工人无法查询数据库的问题。

于 2019-09-06T16:19:38.417 回答