1

刚开始接触 Kiba,没有发现任何明显的东西,但我可能只是在引导我内心的孩子(他通过盯着天花板寻找他们的鞋子)。

我想将一个非常大的表转储到 Amazon Redshift。似乎最快的方法是将一堆 CSV 文件写入 S3 存储桶,然后告诉 Redshift(通过COPY命令)将它们拉入。神奇的缩放 gremlins 将完成剩下的工作。

所以,我认为我希望 Kiba 为每 10k 行数据编写一个 CSV 文件,然后将其推送到 s3,然后开始写入一个新文件。最后,进行后处理调用COPY

那么,我可以“流水线化”这项工作还是应该是一个大的、嵌套的 Destination 类?

IE

source -> transform -> transform ... -> [ csv -> s3 ]{every 10000}; post-process

4

3 回答 3

0

Thibaut,我做了类似的事情,除了我将它流式传输到一个 Tempfile,我想......

require 'csv'

# @param limit [Integer, 1_000] Number of rows per csv file
# @param callback [Proc] Proc taking one argument [CSV/io], that can be used after
#        each csv file is finished
module PacerPro
  class CSVDestination
    def initialize(limit: 1_000, callback: ->(obj) { })
      @limit = limit
      @callback = callback

      @csv = nil
      @row_count = 0
    end

    # @param row [Hash] returned from transforms
    def write(row)
      csv << row.values
      @row_count += 1
      return if row_count < limit

      self.close
    end

    # Called by Kiba when the transform pipeline is finished
    def close
      csv.close

      callback.call(csv)

      tempfile.unlink

      @csv = nil
      @row_count = 0
    end

    private

    attr_reader :limit, :callback
    attr_reader :row_count, :tempfile

    def csv
      @csv ||= begin
        @tempfile = Tempfile.new('csv')
        CSV.open(@tempfile, 'w')
      end
    end
  end
end
于 2018-06-15T18:00:03.217 回答
0

我不确定这里的确切问题。但是,我认为您的解决方案总体上似乎是正确的,但建议很少。

  1. 您也可以考虑gzip在发送到 S3 时每个 CSV 文件和它们有超过 10K 的记录。
  2. 您想查看menifest包含多个文件列表的创建,然后运行copy提供menifest文件作为输入的命令。
于 2018-06-15T06:49:35.540 回答
0

Kiba作者在这里。感谢您试用!

目前,实现这一点的最佳方式是创建我所说的“缓冲目的地”。(其中的一个版本可能最终会出现在 Kiba Common 中)。

(请彻底测试,我今天早上刚刚为您编写了它,根本没有运行它,尽管我过去使用的通用版本较少。另外请记住,这个版本使用内存缓冲区来存储 10k行,因此将数字增加到更大会消耗内存。不过,也可以创建一个内存消耗最少的版本,它会在你获得行时将行写入文件)

class BufferingDestination
  def initialize(buffer_size:, on_flush:)
    @buffer = []
    @buffer_size
    @on_flush = on_flush
    @batch_index = 0
  end

  def write(row)
    @buffer << row
    flush if @buffer.size >= buffer_size
  end

  def flush
    on_flush.call(batch_index: @batch_index, rows: @buffer)
    @batch_index += 1
    @buffer.clear
  end

  def close
    flush
  end
end

这是您可以像这样使用的东西,例如这里重用Kiba Common CSV 目标(尽管您也可以编写自己的):

require 'kiba-common/destinations/csv'

destination BufferingDestination,
  buffer_size: 10_000,
  on_flush: -> { |batch_index, rows|
    filename = File.join("output-#{sprintf("%08d", batch_index)}")
    csv = Kiba::Common::Destinations::CSV.new(
      filename: filename,
      csv_options: { ... },
      headers: %w(my fields here)
    )
    rows.each { |r| csv.write(r) }
    csv.close
  }

然后,您可以在生成文件后在块中触发您的COPY权限on_flush(如果您希望立即开始上传),或者在一个post_process块中触发您的权限(但这只会在所有 CSV 准备好后开始,这可以确保某些功能如果您愿意,可以使用事务性全局上传形式)。

如果你真的需要这个,你可以幻想并启动一个线程队列来实际并行处理上传(但要小心僵尸线程等)。

另一种方法是使用“多步骤”ETL 流程,其中一个脚本生成 CSV,另一个选择它们进行上传,同时运行(这是我在RubyKaigi 2018的演讲中解释的内容)。

让我知道事情是如何为你工作的!

于 2018-06-15T07:31:31.190 回答