71

我正在尝试使用 ActiveRecord 的find_each方法运行约 50,000 条记录的查询,但它似乎忽略了我的其他参数,如下所示:

Thing.active.order("created_at DESC").limit(50000).find_each {|t| puts t.id }

我不想在 50,000 处停止并按 排序,而是在整个created_at数据集上执行的结果查询:

Thing Load (198.8ms)  SELECT "things".* FROM "things" WHERE "things"."active" = 't' AND ("things"."id" > 373343) ORDER BY "things"."id" ASC LIMIT 1000

有没有办法获得类似的行为,find_each但有一个总最大限制并尊重我的排序标准?

4

13 回答 13

72

文档说 find_each 和 find_in_batches 不保留排序顺序和限制,因为:

  • PK 上的排序 ASC 用于使批量排序工作。
  • 限制用于控制批量大小。

你可以像@rorra 那样编写你自己的这个函数版本。但是在改变对象时可能会遇到麻烦。例如,如果您按 created_at 排序并保存对象,它可能会在下一批中再次出现。同样,您可能会跳过对象,因为在执行查询以获取下一批时结果的顺序发生了变化。仅将该解决方案与只读对象一起使用。

现在我主要担心的是我不想一次将 30000 多个对象加载到内存中。我关心的不是查询本身的执行时间。因此,我使用了一个执行原始查询但只缓存 ID 的解决方案。然后它将 ID 数组划分为块并查询/创建每个块的对象。通过这种方式,您可以安全地改变对象,因为排序顺序保存在内存中。

这是一个类似于我所做的最小示例:

batch_size = 512
ids = Thing.order('created_at DESC').pluck(:id) # Replace .order(:created_at) with your own scope
ids.each_slice(batch_size) do |chunk|
    Thing.find(chunk, :order => "field(id, #{chunk.join(',')})").each do |thing|
      # Do things with thing
    end
end

此解决方案的权衡是:

  • 执行完整的查询以获取 ID
  • 所有 ID 的数组都保存在内存中
  • 使用 MySQL 特定的 FIELD() 函数

希望这可以帮助!

于 2013-11-06T17:21:12.457 回答
27

find_each在后台使用find_in_batches

无法选择记录的顺序,如find_in_batches中所述,自动将主键(“id ASC”)设置为升序以使批处理排序工作。

但是,应用了标准,您可以做的是:

Thing.active.find_each(batch_size: 50000) { |t| puts t.id }

关于限制,还没有实现: https ://github.com/rails/rails/pull/5696


回答您的第二个问题,您可以自己创建逻辑:

total_records = 50000
batch = 1000
(0..(total_records - batch)).step(batch) do |i|
  puts Thing.active.order("created_at DESC").offset(i).limit(batch).to_sql
end
于 2013-03-03T20:11:12.893 回答
18

检索第ids一个并处理in_groups_of

ordered_photo_ids = Photo.order(likes_count: :desc).pluck(:id)

ordered_photo_ids.in_groups_of(1000, false).each do |photo_ids|
  photos = Photo.order(likes_count: :desc).where(id: photo_ids)

  # ...
end

ORDER BY将查询添加到内部调用也很重要。

于 2014-07-23T17:58:23.420 回答
4

一种选择是将为您的特定模型量身定制的实现放入模型本身(说到这,id通常是订购记录的更好选择,created_at可能有重复):

class Thing < ActiveRecord::Base
  def self.find_each_desc limit
    batch_size = 1000
    i = 1
    records = self.order(created_at: :desc).limit(batch_size)
    while records.any?
      records.each do |task|
        yield task, i
        i += 1
        return if i > limit
      end
      records = self.order(created_at: :desc).where('id < ?', records.last.id).limit(batch_size)
    end
  end
end

或者你可以概括一下,让它适用于所有模型:

lib/active_record_extensions.rb

ActiveRecord::Batches.module_eval do
  def find_each_desc limit
    batch_size = 1000
    i = 1
    records = self.order(id: :desc).limit(batch_size)
    while records.any?
      records.each do |task|
        yield task, i
        i += 1
        return if i > limit
      end
      records = self.order(id: :desc).where('id < ?', records.last.id).limit(batch_size)
    end
  end
end

ActiveRecord::Querying.module_eval do
  delegate :find_each_desc, :to => :all
end

config/initializers/extensions.rb

require "active_record_extensions"

PS我根据这个答案将代码放入文件中。

于 2015-02-25T07:28:42.847 回答
4

Rails 6.1 增加find_each,find_in_batchesin_batches.

于 2020-06-29T08:01:46.590 回答
3

您可以通过标准 ruby​​ 迭代器向后迭代:

Thing.last.id.step(0,-1000) do |i|
  Thing.where(id: (i-1000+1)..i).order('id DESC').each do |thing|
    #...
  end
end

注意:+1是因为 BETWEEN 将在查询中包含两个边界,但我们只需要包含一个。

当然,使用这种方法可以批量获取少于 1000 条记录,因为其中一些已被删除,但在我的情况下这没问题。

于 2016-07-20T23:20:33.997 回答
3

正如@Kirk 在其中一条评论中所说,从版本5.1.0find_each开始支持。limit

变更日志中的示例:

Post.limit(10_000).find_each do |post|
  # ...
end

文档说:

遵守限制,如果存在,则对批量大小没有要求:它可以小于、等于或大于限制。

(虽然仍然不支持设置自定义订单)

于 2019-01-10T15:25:47.370 回答
2

我一直在寻找相同的行为并想出了这个解决方案。这不是由 created_at 订购的,但我想我还是会发布。

max_records_to_retrieve = 50000
last_index = Thing.count
start_index = [(last_index - max_records_to_retrieve), 0].max
Thing.active.find_each(:start => start_index) do |u|
    # do stuff
end

这种方法的缺点: - 您需要 2 个查询(第一个应该很快) - 这可以保证最多 50K 记录,但如果跳过 id,您将获得更少。

于 2014-06-29T02:09:32.877 回答
2

您可以尝试ar-as-batches Gem。

从他们的文档中,您可以执行以下操作

Users.where(country_id: 44).order(:joined_at).offset(200).as_batches do |user|
  user.party_all_night!
end
于 2016-08-31T21:11:13.397 回答
0

使用Kaminari或其他东西会很容易。

创建批处理加载器类。

module BatchLoader
  extend ActiveSupport::Concern

  def batch_by_page(options = {})
    options = init_batch_options!(options)

    next_page = 1

    loop do
      next_page = yield(next_page, options[:batch_size])

      break next_page if next_page.nil?
    end
  end

  private

  def default_batch_options
    {
      batch_size: 50
    }
  end

  def init_batch_options!(options)
    options ||= {}
    default_batch_options.merge!(options)
  end
end

创建存储库

class ThingRepository
  include BatchLoader

  # @param [Integer] per_page
  # @param [Proc] block
  def batch_changes(per_page=100, &block)
    relation = Thing.active.order("created_at DESC")

    batch_by_page do |next_page|
      query = relation.page(next_page).per(per_page)
      yield query if block_given?
      query.next_page
    end
  end
end

使用存储库

repo = ThingRepository.new
repo.batch_changes(5000).each do |g|
  g.each do |t|
    #...
  end
end
于 2018-10-17T13:35:50.430 回答
0

添加 find_in_batches_with_order 确实解决了我的用例,我已经有 id 但需要批处理和排序。它的灵感来自@dirk-geurs 解决方案

# Create file config/initializers/find_in_batches_with_order.rb with follwing code.
ActiveRecord::Batches.class_eval do
  ## Only flat order structure is supported now
  ## example: [:forename, :surname] is supported but [:forename, {surname: :asc}] is not supported
  def find_in_batches_with_order(ids: nil, order: [], batch_size: 1000)
    relation = self
    arrangement = order.dup
    index = order.find_index(:id)

    unless index
      arrangement.push(:id)
      index = arrangement.length - 1
    end

    ids ||= relation.order(*arrangement).pluck(*arrangement).map{ |tupple| tupple[index] }
    ids.each_slice(batch_size) do |chunk_ids|
      chunk_relation = relation.where(id: chunk_ids).order(*order)
      yield(chunk_relation)
    end
  end
end

在这里留下要点https://gist.github.com/the-spectator/28b1176f98cc2f66e870755bb2334545

于 2019-10-18T12:41:00.130 回答
0

DISTINCT ON我在查询需要该字段的位置时遇到了同样的问题ORDER BY,所以这是我使用 Postgres 的方法:

def filtered_model_ids
  Model.joins(:father_model)
       .select('DISTINCT ON (model.field) model.id')
       .order(:field)
       .map(&:id)
end

def processor
  filtered_model_ids.each_slice(BATCH_SIZE).lazy.each do |batch|
    Model.find(batch).each do |record|
      # Code
    end
  end
end
于 2020-04-17T19:45:39.637 回答
-1

在一个查询中执行并避免迭代:

User.offset(2).order('name DESC').last(3)

将产生这样的查询

SELECT "users".* FROM "users" ORDER BY name ASC LIMIT $1 OFFSET $2 [["LIMIT", 3], ["OFFSET", 2]

于 2017-09-11T08:44:36.303 回答