假设我需要为 100 个用户进行复杂的计算。我当前的配置如下所示:
制片人
class Producer
class << self
def publish(target, options = {})
connection = Bunny.new(some_params).start
channel = connection.create_channel
exchange = channel.fanout("#{target}_exchange", durable: true)
exchange.publish(options.to_json)
end
end
end
MassComplexCalculations 工作者
module UsersWorkers
class MassComplexCalculations
include Sneakers::Worker
from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"
def work(options)
parsed_options = JSON.parse(options)
ActiveRecord::Base.connection_pool.with_connection do
User.where(id: parsed_options['ids']).each do |user|
::Services::Users::ComplexCalculations.call(user)
end
end
ack!
end
end
end
运行工人
Producer.publish("#{ENV['RAILS_ENV']}.users.mass_complex_calculations", ids: User.limit(100).ids)
我不太了解 AMQP 如何分配资源来执行任务以及如何提供帮助。对吗,在单独的工作人员中运行每个计算会更好吗?例如:
更改 MassComplexCalculations 工作者
module UsersWorkers
class MassComplexCalculations
include Sneakers::Worker
from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"
def work(options)
parsed_options = JSON.parse(options)
ActiveRecord::Base.connection_pool.with_connection do
parsed_options['ids'].each do |id|
Producer.publish("#{ENV['RAILS_ENV']}.users.personal_complex_calculations", id: id)
end
end
ack!
end
end
end
新的 PersonalComplexCalculations 工作者
module UsersWorkers
class PersonalComplexCalculations
include Sneakers::Worker
from_queue "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_queue",
exchange: "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_exchange"
def work(options)
parsed_options = JSON.parse(options)
user = User.find(parsed_options['id'])
ActiveRecord::Base.connection_pool.with_connection do
::Services::Users::ComplexCalculations.call(user)
end
ack!
end
end
end
据我了解,可能有两种选择:
- 第一个实现可能会运行得更慢,因为它会按顺序为每个用户调用服务,而在第二个选项中,我们将有 100 个同时工作的工作人员,他们将并行工作
- 没有区别
那么哪种方法更好呢?或者甚至其中一个是完全错误的?
提前致谢。