6

在我们的 Rails 3.2.13 应用程序(Heroku 上的 Ruby 2.0.0 + Postgres)中,我们经常从 API 检索大量订单数据,然后我们需要更新或创建数据库中的每个订单,以及协会。一个订单创建/更新自己加上大约。10-15 个相关对象,我们一次最多导入 500 个订单。

下面的代码有效,但问题是它在速度方面根本没有效率。创建/更新 500 条记录大约需要。1 分钟并生成 6500+ db 查询!

def add_details(shop, shopify_orders)
  shopify_orders.each do |shopify_order|
    order = Order.where(:order_id => shopify_order.id.to_s, :shop_id => shop.id).first_or_create
    order.update_details(order,shopify_order,shop)  #This calls update_attributes for the Order
    ShippingLine.add_details(order, shopify_order.shipping_lines)
    LineItem.add_details(order, shopify_order.line_items)
    Taxline.add_details(order, shopify_order.tax_lines)
    Fulfillment.add_details(order, shopify_order.fulfillments)
    Note.add_details(order, shopify_order.note_attributes)
    Discount.add_details(order, shopify_order.discount_codes)
    billing_address = shopify_order.billing_address rescue nil
    if !billing_address.blank?
      BillingAddress.add_details(order, billing_address)
    end
    shipping_address = shopify_order.shipping_address rescue nil
    if !shipping_address.blank?
      ShippingAddress.add_details(order, shipping_address)
    end
    payment_details = shopify_order.payment_details rescue nil
    if !payment_details.blank?
      PaymentDetail.add_details(order, payment_details)
    end
  end
end

  def update_details(order,shopify_order,shop)
    order.update_attributes(
      :order_name => shopify_order.name,
      :order_created_at => shopify_order.created_at,
      :order_updated_at => shopify_order.updated_at,
      :status => Order.get_status(shopify_order),
      :payment_status => shopify_order.financial_status,
      :fulfillment_status => Order.get_fulfillment_status(shopify_order),
      :payment_method => shopify_order.processing_method,
      :gateway => shopify_order.gateway,
      :currency => shopify_order.currency,
      :subtotal_price => shopify_order.subtotal_price,
      :subtotal_tax => shopify_order.total_tax,
      :total_discounts => shopify_order.total_discounts,
      :total_line_items_price => shopify_order.total_line_items_price,
      :total_price => shopify_order.total_price,
      :total_tax => shopify_order.total_tax,
      :total_weight => shopify_order.total_weight,
      :taxes_included => shopify_order.taxes_included,
      :shop_id => shop.id,
      :email => shopify_order.email,
      :order_note => shopify_order.note
    )
  end

如您所见,我们正在遍历每个订单,确定它是否存在(然后加载现有订单或创建新订单),然后调用 update_attributes 传递订单的详细信息。之后,我们创建或更新每个关联。每个关联的模型看起来都与此非常相似:

  class << self
    def add_details(order, tax_lines)
      tax_lines.each do |shopify_tax_line|
        taxline = Taxline.find_or_create_by_order_id(:order_id => order.id)
        taxline.update_details(shopify_tax_line)
      end
    end
  end
  def update_details(tax_line)
    self.update_attributes(:price => tax_line.price, :rate => tax_line.rate, :title => tax_line.title)
  end

我查看了 activerecord-import gem,但不幸的是,它似乎更适合批量创建记录,而不是我们也需要的更新。

可以提高性能的最佳方法是什么?

非常感谢提前。

更新:

我想出了这个微小的改进,它基本上消除了更新新创建的订单的调用(每个订单少一个查询)。

 def add_details(shop, shopify_orders)
      shopify_orders.each do |shopify_order|
      values = {:order_id => shopify_order.id.to_s, :shop_id => shop.id,
        :order_name => shopify_order.name,
            :order_created_at => shopify_order.created_at,
            :order_updated_at => shopify_order.updated_at,
            :status => Order.get_status(shopify_order),
            :payment_status => shopify_order.financial_status,
            :fulfillment_status => Order.get_fulfillment_status(shopify_order),
            :payment_method => shopify_order.processing_method,
            :gateway => shopify_order.gateway,
            :currency => shopify_order.currency,
            :subtotal_price => shopify_order.subtotal_price,
            :subtotal_tax => shopify_order.total_tax,
            :total_discounts => shopify_order.total_discounts,
            :total_line_items_price => shopify_order.total_line_items_price,
            :total_price => shopify_order.total_price,
            :total_tax => shopify_order.total_tax,
            :total_weight => shopify_order.total_weight,
            :taxes_included => shopify_order.taxes_included,
            :email => shopify_order.email,
            :order_note => shopify_order.note}
        get_order = Order.where(:order_id => shopify_order.id.to_s, :shop_id => shop.id)
        if get_order.blank?
            order = Order.create(values)
        else
        order = get_order.first  
            order.update_attributes(values)
        end
        ShippingLine.add_details(order, shopify_order.shipping_lines)
        LineItem.add_details(order, shopify_order.line_items)
        Taxline.add_details(order, shopify_order.tax_lines)
        Fulfillment.add_details(order, shopify_order.fulfillments)
        Note.add_details(order, shopify_order.note_attributes)
        Discount.add_details(order, shopify_order.discount_codes)
        billing_address = shopify_order.billing_address rescue nil
        if !billing_address.blank?
          BillingAddress.add_details(order, billing_address)
        end
        shipping_address = shopify_order.shipping_address rescue nil
        if !shipping_address.blank?
          ShippingAddress.add_details(order, shipping_address)
        end
        payment_details = shopify_order.payment_details rescue nil
        if !payment_details.blank?
          PaymentDetail.add_details(order, payment_details)
        end
      end
 end

对于关联的对象:

  class << self
    def add_details(order, tax_lines)
      tax_lines.each do |shopify_tax_line|
        values = {:order_id => order.id,
            :price => tax_line.price,
            :rate => tax_line.rate,
            :title => tax_line.title}
        get_taxline = Taxline.where(:order_id => order.id)
        if get_taxline.blank?
            taxline = Taxline.create(values)
        else
            taxline = get_taxline.first  
            taxline.update_attributes(values)
        end
      end
    end
  end

有更好的建议吗?

4

3 回答 3

7

尝试将整个代码包装到单个数据库事务中。由于您在 Heroku 上,因此它将是 Postgres 的底端。有了这么多更新语句,您可能会通过一次处理它们而受益匪浅,因此您的代码执行得更快,并且基本上只留下 6500 条语句的“队列”在 Postgres 端运行,因为服务器能够将它们出列。根据底端,您可能必须处理成更小的块 - 但即使一次处理 100 个(然后关闭并重新打开事务)也会大大提高 Pg 的吞吐量。

http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html http://www.postgresql.org/docs/9.2/static/sql-set-transaction.html

因此,在第 2 行之前,您需要添加如下内容:

def add_details(shop, shopify_orders)
  Order.transaction do
    shopify_orders.each do |shopify_order|

然后在方法的最后添加另一端:

      if !payment_details.blank?
        PaymentDetail.add_details(order, payment_details)
      end
    end //shopify_orders.each..
  end //Order.transaction..
end //method
于 2013-09-26T20:14:45.200 回答
1

您可以像这样对 ActiveRecord 进行猴子补丁:

class ActiveRecord::Base

  #http://stackoverflow.com/questions/15317837/bulk-insert-records-into-active-record-table?lq=1
  #https://gist.github.com/jackrg/76ade1724bd816292e4e
  #  "UPDATE THIS SET <list_of_column_assignments>  FROM <table_name> THIS  JOIN (VALUES (<csv1>, <csv2>,...) VALS ( <column_names> ) ON <list_of_primary_keys_comparison>"
  def self.bulk_update(record_list)
      pk = self.primary_key
      raise "primary_key not found" unless pk.present?

      raise "record_list not an Array of Hashes" unless record_list.is_a?(Array) && record_list.all? {|rec| rec.is_a? Hash }
      return nil if record_list.empty?

      result = nil

      #test if every hash has primary keys, so we can JOIN
      record_list.each { |r|  raise "Primary Keys '#{self.primary_key.to_s}' not found on record: #{r}" unless hasAllPKs?(r) }


      #list of primary keys comparison
      pk_comparison_array = []
      if (pk).is_a?(Array)
          pk.each {|thiskey| pk_comparison_array << "THIS.#{thiskey} = VALS.#{thiskey}" }
      else
          pk_comparison_array << "THIS.#{pk} = VALS.#{pk}"
      end
      pk_comparison = pk_comparison_array.join(' AND ')

      #SQL
      (1..record_list.count).step(1000).each do |start|
        key_list, value_list = convert_record_list(record_list[start-1..start+999])
        #csv values
        csv_vals = value_list.map {|v| "(#{v.join(", ")})" }.join(", ")
        #column names
        column_names = key_list.join(", ")
        #list of columns assignments
        columns_assign_array = []
        key_list.each {|col|
          unless inPK?(col)
            columns_assign_array << "THIS.#{col} = VALS.#{col}"
          end }
        columns_assign = columns_assign_array.join(', ')

        sql = "UPDATE THIS SET #{columns_assign}  FROM #{self.table_name} THIS  JOIN ( VALUES #{csv_vals} ) VALS ( #{column_names} ) ON ( #{pk_comparison} )"
        result = self.connection.execute(sql)

        return result if result<0
      end

      return result

  end

  def self.inPK?(str)
      pk = self.primary_key

      test = str.to_s
      if pk.is_a?(Array)
            (pk.include?(test))
      else
            (pk==test)
      end
  end

  #test if given hash has primary keys included as hash keys and those keys are not empty
  def self.hasAllPKs?(hash)
      h = hash.stringify_keys
      pk = self.primary_key

      if pk.is_a?(Array)
           (pk.all? {|k| h.key?(k) and h[k].present? })
      else
           h.key?(pk) and h[pk].present?
      end
  end

  def self.convert_record_list(record_list)
    # Build the list of keys
    key_list = record_list.map(&:keys).flatten.map(&:to_s).uniq.sort

    value_list = record_list.map do |rec|
      list = []
      key_list.each {|key| list <<  ActiveRecord::Base.connection.quote(rec[key] || rec[key.to_sym]) }
      list
    end

    # If table has standard timestamps and they're not in the record list then add them to the record list
    time = ActiveRecord::Base.connection.quote(Time.now)
    for field_name in %w(created_at updated_at)
      if self.column_names.include?(field_name) && !(key_list.include?(field_name))
        key_list << field_name
        value_list.each {|rec| rec << time }
      end
    end

    return [key_list, value_list]
  end
end

然后,您可以生成包含模型属性(包括它们的主键)的哈希数组并执行以下操作:

ActiveRecord::Base.transaction do
   Model.bulk_update [ {attr1: val1, attr2: val2,...},  {attr1: val1, attr2: val2,...},   ... ]
end

这将是一个没有 Rails 回调和验证的 SQL 命令。

于 2014-08-21T15:20:48.603 回答
0

对于 PostgreSQL,上面的方法没有解决几个问题:

  1. 您必须在更新目标表中指定一个实际表,而不仅仅是别名。
  2. 您不能在 FROM 短语中重复目标表。由于您要将目标表连接到 VALUES 表(因此 FROM 短语中只有一个表,您将无法使用 JOIN,而必须使用“WHERE”。
  3. 您不会在 VALUES 表中获得与在简单“UPDATE”命令中所做的相同的“免费”转换,因此您必须像这样转换日期/时间戳值(#val_cast 执行此操作)。

    class ActiveRecord::Base
    
      def self.update!(record_list)
        raise ArgumentError "record_list not an Array of Hashes" unless record_list.is_a?(Array) && record_list.all? {|rec| rec.is_a? Hash }
        return record_list if record_list.empty?
    
        (1..record_list.count).step(1000).each do |start|
          field_list, value_list = convert_record_list(record_list[start-1..start+999])
          key_field = self.primary_key
          non_key_fields = field_list - [%Q["#{self.primary_key}"], %Q["created_at"]]
          columns_assign = non_key_fields.map {|field| "#{field} = #{val_cast(field)}"}.join(",")
          value_table = value_list.map {|row| "(#{row.join(", ")})" }.join(", ")
          sql = "UPDATE #{table_name} AS this SET #{columns_assign} FROM (VALUES #{value_table}) vals (#{field_list.join(", ")}) WHERE this.#{key_field} = vals.#{key_field}"
          self.connection.update_sql(sql)
        end
    
        return record_list
      end
    
      def self.val_cast(field)
        field = field.gsub('"', '')
        if (column = columns.find{|c| c.name == field }).sql_type =~ /time|date/
          "cast (vals.#{field} as #{column.sql_type})"
        else
          "vals.#{field}"
        end
      end
    
      def self.convert_record_list(record_list)
        # Build the list of fields
        field_list = record_list.map(&:keys).flatten.map(&:to_s).uniq.sort
    
        value_list = record_list.map do |rec|
          list = []
          field_list.each {|field| list <<  ActiveRecord::Base.connection.quote(rec[field] || rec[field.to_sym]) }
          list
        end
    
        # If table has standard timestamps and they're not in the record list then add them to the record list
        time = ActiveRecord::Base.connection.quote(Time.now)
        for field_name in %w(created_at updated_at)
          if self.column_names.include?(field_name) && !(field_list.include?(field_name))
            field_list << field_name
            value_list.each {|rec| rec << time }
          end
        end
    
        field_list.map! {|field| %Q["#{field}"] }
    
        return [field_list, value_list]
      end
    end
    
于 2014-11-22T19:01:53.243 回答