0

假设我需要为 100 个用户进行复杂的计算。我当前的配置如下所示:

制片人

class Producer
  class << self
    def publish(target, options = {})
      connection = Bunny.new(some_params).start
      channel    = connection.create_channel
      exchange   = channel.fanout("#{target}_exchange", durable: true)

      exchange.publish(options.to_json)
    end
  end
end

MassComplexCalculations 工作者

module UsersWorkers
  class MassComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)

      ActiveRecord::Base.connection_pool.with_connection do
        User.where(id: parsed_options['ids']).each do |user|
          ::Services::Users::ComplexCalculations.call(user)
        end
      end
      ack!
    end
  end
end

运行工人

Producer.publish("#{ENV['RAILS_ENV']}.users.mass_complex_calculations", ids: User.limit(100).ids)

我不太了解 AMQP 如何分配资源来执行任务以及如何提供帮助。对吗,在单独的工作人员中运行每个计算会更好吗?例如:

更改 MassComplexCalculations 工作者

module UsersWorkers
  class MassComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)

      ActiveRecord::Base.connection_pool.with_connection do
        parsed_options['ids'].each do |id|
          Producer.publish("#{ENV['RAILS_ENV']}.users.personal_complex_calculations", id: id)
        end
      end
      ack!
    end
  end
end

新的 PersonalComplexCalculations 工作者

module UsersWorkers
  class PersonalComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)
      user           = User.find(parsed_options['id'])

      ActiveRecord::Base.connection_pool.with_connection do
        ::Services::Users::ComplexCalculations.call(user)
      end
      ack!
    end
  end
end

据我了解,可能有两种选择:

  1. 第一个实现可能会运行得更慢,因为它会按顺序为每个用户调用服务,而在第二个选项中,我们将有 100 个同时工作的工作人员,他们将并行工作
  2. 没有区别

那么哪种方法更好呢?或者甚至其中一个是完全错误的?

提前致谢。

4

1 回答 1

0

你的两个假设都不成立。您不能保证有 100 个并行工作人员,因为运动鞋有一个您不一定要覆盖的默认线程池大小:

https://github.com/jondot/sneakers/blob/master/lib/sneakers/worker.rb#L20

如果你没有配置至少 100 个连接的 ActiveRecord 连接池,你的代码也会因为这里的资源匮乏而阻塞和等待。

在一般情况下,大多数时候并行执行此类任务可能会更快- 但这不能保证。

于 2018-10-31T14:17:13.377 回答