7

我想编写一个 Kiba Etl 脚本,它有一个从 CSV 到目标 CSV 的源,其中有一个转换规则列表,其中第二个转换器是一个聚合,其中的操作如选择名称、按名称求和(欧元)组

Kiba ETL 脚本文件

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:name, :euro]

transform AggregateFields, { sum: :euro, group_by: :name}

transform RenameField,from: :euro, to: :total_amount

destination CsvDestination, 'result.csv', [:name, :total_amount]

用户.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv(预期结果)

total_amount;name
16;Jack
97;Jill
99;Mack

由于 etl 转换器一次在一行上一个接一个地执行,但是我的第二个转换器行为取决于整个行集合,我无法在传递给转换方法的类中访问它。

transform AggregateFields, { sum: :euro, group_by: :name }

是否有可能使用 kiba gem 实现这种行为
提前谢谢你

4

1 回答 1

9

编辑:现在是 2020 年,Kiba ETL v3 包含了一种更好的方法来做到这一点。查看这篇文章https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3了解所有相关信息。

Kiba作者在这里!您可以通过多种不同的方式实现这一点,主要取决于数据大小和您的实际需求。这里有几种可能性。

在 Kiba 脚本中使用变量进行聚合

require 'awesome_print'

transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

total_amounts = Hash.new(0)

transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end

post_process do
  # pretty print here, but you could save to a CSV too
  ap total_amounts
end

这是最简单的方法,但非常灵活。

不过,它会将您的聚合保存在内存中,因此这可能足够好或不够好,具体取决于您的场景。请注意,目前 Kiba 是单线程的(但“Kiba Pro”将是多线程的),因此目前不需要为聚合添加锁或使用线程安全结构。

从 post_process 块调用 TextQL

另一种快速简便的聚合方法是首先生成一个非聚合的 CSV 文件,然后利用TextQl实际进行聚合,如下所示:

destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]

post_process do
  query = <<SQL
    select
      name,
      /* apparently sqlite has reduced precision, round to 2 for now */
      round(sum(amount), 2) as total_amount
    from tbl group by name
SQL

  textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end

定义了以下助手:

def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end

def textql(source_file, query, output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end

但是在进行计算时要注意精度。

编写内存聚合目的地

一个可以在这里工作的有用技巧是用一个类包装给定的目的地来进行聚合。这是它的样子:

class InMemoryAggregate
  def initialize(sum:, group_by:, destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba, but not too much
    @destination = destination.shift.new(*destination)
  end

  def write(row)
    # do not write, but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end

  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k, @sum => value)
    end
    @destination.close
  end
end

您可以这样使用:

# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

destination CsvDestination, 'non-aggregated.csv', [:name, :amount]

destination InMemoryAggregate,
  sum: :amount, group_by: :name,
  destination: [
    CsvDestination, 'aggregated.csv', [:name, :amount]
  ]

post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end

这个版本的好处是你可以将你的聚合器重用于不同的目的地(比如一个数据库,或其他任何东西)。

请注意,这会将所有聚合保留在内存中,就像第一个版本一样。

插入具有聚合功能的商店

另一种方法(如果您有非常大的容量特别有用)是将结果数据发送到能够为您聚合数据的东西中。它可以是常规的 SQL 数据库、Redis 或任何更花哨的东西,然后您可以根据需要对其进行查询。

正如我所说,实施将在很大程度上取决于您的实际需求。希望你能在这里找到适合你的东西!

于 2015-06-30T23:01:15.143 回答