0

我有一个 kiba 作业,它需要一个 CSV 文件(使用Kiba::Common::Sources::CSV),丰富其数据,合并一些行(使用此处ChainableAggregateDestination描述的目标)并将其保存到另一个 CSV 文件(使用)。Kiba::Common::Destinations::CSV

现在,我想对目标 CSV 中的行进行不同的排序(基于第一列)。我找不到一种方法来编写执行此操作的转换。我可以post_process用来重新打开目标 CSV,对其进行排序并重写它,但我想有一种更清洁的方法......

有人可以指出我正确的方向吗?

4

1 回答 1

1

要对行进行排序,一个好的策略是使用“聚合转换”,如本文所述,将所有行存储在内存中(尽管您可以在内存中进行),然后在转换“关闭”时对它们进行排序并在管道中重新发射它们。

这是 IMO 最灵活的设计。

class SortingTransform
  def initialize(config...)
    @rows = []
  end

  def process(row)
    @rows << row
    nil # do not emit rows right away
  end

  def close
    # Here: sort the rows, optionally using external
    # configuration passed at init time
    @rows.sort_by { ... }.each do |row|
      yield row
    end
  end
end

您确实也可以在辅助 ETL 作业中重新打开输出并对其进行排序,但如果第一个解决方案对您有用,我通常更喜欢它。

于 2021-05-14T18:44:45.133 回答