Kiba作者在这里。感谢您试用!
目前,实现这一点的最佳方式是创建我所说的“缓冲目的地”。(其中的一个版本可能最终会出现在 Kiba Common 中)。
(请彻底测试,我今天早上刚刚为您编写了它,根本没有运行它,尽管我过去使用的通用版本较少。另外请记住,这个版本使用内存缓冲区来存储 10k行,因此将数字增加到更大会消耗内存。不过,也可以创建一个内存消耗最少的版本,它会在你获得行时将行写入文件)
class BufferingDestination
def initialize(buffer_size:, on_flush:)
@buffer = []
@buffer_size
@on_flush = on_flush
@batch_index = 0
end
def write(row)
@buffer << row
flush if @buffer.size >= buffer_size
end
def flush
on_flush.call(batch_index: @batch_index, rows: @buffer)
@batch_index += 1
@buffer.clear
end
def close
flush
end
end
这是您可以像这样使用的东西,例如这里重用Kiba Common CSV 目标(尽管您也可以编写自己的):
require 'kiba-common/destinations/csv'
destination BufferingDestination,
buffer_size: 10_000,
on_flush: -> { |batch_index, rows|
filename = File.join("output-#{sprintf("%08d", batch_index)}")
csv = Kiba::Common::Destinations::CSV.new(
filename: filename,
csv_options: { ... },
headers: %w(my fields here)
)
rows.each { |r| csv.write(r) }
csv.close
}
然后,您可以在生成文件后在块中触发您的COPY
权限on_flush
(如果您希望立即开始上传),或者在一个post_process
块中触发您的权限(但这只会在所有 CSV 准备好后开始,这可以确保某些功能如果您愿意,可以使用事务性全局上传形式)。
如果你真的需要这个,你可以幻想并启动一个线程队列来实际并行处理上传(但要小心僵尸线程等)。
另一种方法是使用“多步骤”ETL 流程,其中一个脚本生成 CSV,另一个选择它们进行上传,同时运行(这是我在RubyKaigi 2018的演讲中解释的内容)。
让我知道事情是如何为你工作的!