3

我正忙于处理 ETL 管道,但对于这个特殊问题,我需要获取一个数据表,并将每一列转换为一个集合 - 即一个唯一的数组。

我正在努力思考如何在 Kiba 框架内完成此任务。

这是我要实现的目标的本质:

来源

[
  { dairy: "Milk",   protein: "Steak",   carb: "Potatoes" },
  { dairy: "Milk",   protein: "Eggs",    carb: "Potatoes" },
  { dairy: "Cheese", protein: "Steak",   carb: "Potatoes" },
  { dairy: "Cream",  protein: "Chicken", carb: "Potatoes" },
  { dairy: "Milk",   protein: "Chicken", carb: "Pasta" },
]

目的地

{
  dairy:   ["Milk", "Cheese", "Cream"],
  protein: ["Steak", "Eggs", "Chicken"],
  carb:    ["Potatoes", "Pasta"],
}

像这样的事情a)在Kiba可行,b)甚至建议在Kiba做吗?

任何帮助将不胜感激。

更新 - 部分解决。

我找到了部分解决方案。这个转换器类会将行表转换为集合的散列,但我不知道如何使用 ETL 目标获取该数据。我怀疑我正在以一种不打算使用的方式使用 Kiba。

class ColumnSetTransformer
  def initialize
    @col_set = Hash.new(Set.new)
  end

  def process(row)
    row.each do |col, col_val|
      @col_set[col] = @col_set[col] + [col_val]
    end

    @col_set
  end
end 
4

2 回答 2

2

您的解决方案可以正常工作,事实上,在 Kiba 中进行这样的设计(主要是“Plain Old Ruby Objects”)的原因是如果您需要的话,可以轻松地自己调用组件!(这对于测试非常有用!)。

这就是说这里有一些额外的可能性。

您正在做的是一种聚合形式,可以通过多种方式实现。

缓冲目的地

实际上,这里的缓冲区将是单行。使用如下代码:

class MyBufferingDestination
  attr_reader :single_output_row

  def initialize(config:)
    @single_output_row = []
  end

  def write(row)
    row.each do |col, col_val|
      single_output_row[col] += [col_val]
    end
  end

  def close # will be called by Kiba at the end of the run
    # here you'd write your output
  end
end

使用实例变量聚合 + post_process 块

pre_process do
  @output_row = {}
end

transform do |row|
  row.each do |col, col_val|
    @output_row = # SNIP
  end      
  row
end

post_process do
  # convert @output_row to something
  # you can invoke a destination manually, or do something else
end

很快可能:使用缓冲变换

如此处所述,很快就有可能创建缓冲转换,以更好地将聚合机制与目标本身分离。

它会是这样的:

class MyAggregatingTransform
  def process(row)
    @aggregate += xxx
    nil # remove the row from the pipeline
  end

  def close
    # not yet possible, but soon
    yield @aggregate
  end
end

这将是最好的设计,因为这样您就可以重用现有的目的地,而无需修改它们以支持缓冲,因此它们将变得更加通用和可重用:

transform MyAggregatingTransform

destination MyJSONDestination, file: "some.json"

通过检测输入数据集中的边界并相应地产生,甚至可以在目标中拥有多行。

一旦可能,我将更新 SO 答案。

于 2018-03-22T09:57:30.513 回答
0

好的 - 因此,在工作环境中使用 Kiba 似乎并不是该工具的预期用途。我想使用 Kiba 是因为我已经为这个项目实现了很多相关的 E、T 和 L 代码,并且重用性会很大。

所以,如果我有代码要重用,但我不能在 Kiba 框架中使用它,我可以像普通代码一样调用它。这一切都归功于 Thibaut 非常简单的设计!

这是我解决问题的方法:

source  = CSVOrXLSXSource.new("data.xlsx", document_config: { some: :settings })
xformer = ColumnSetTransformer.new

source.each do |row|
  xformer.process(row)
end

p xformer.col_set # col_set must be attr_reader on this class.

现在我可以轻松地转换我的数据:)

于 2018-03-22T09:02:49.547 回答