0

我在 amazon s3 上有巨大的 csv 文件(100MB+),我想分块读取它们并使用 ruby​​ CSV 库处理它们。我很难为 csv 处理创建正确的 IO 对象:

buffer = TheRightIOClass.new
bytes_received = 0
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do     |chunk|
  bytes_received += buffer.write(chunk)
  if bytes_received >= 1*MEGABYTE
    bytes_received = 0
    csv(buffer).each do |row|
      process_csv_record(row)
    end
  end
end

def csv(io)
  @csv ||= CSV.new(io, headers: true)
end

我不知道这里的正确设置应该是什么以及 TheRightIOClass 是什么。我不想使用 StringIO 将整个文件加载到内存中。ruby 中是否有 bufferedio 或 ringbuffer 来执行此操作?如果有人使用线程(无进程)和管道有一个好的解决方案,我很乐意看到它。

4

1 回答 1

2

您可以使用 StringIO 并进行一些巧妙的错误处理,以确保在处理之前将整行放在一个块中。此示例中的打包程序类只是在内存中累积已解析的行,直到您将它们刷新到磁盘或数据库。

packer = Packer.new
object = AWS::S3.new.buckets[bucket].objects[path]
io = StringIO.new
csv = ::CSV.new(io, headers: true)
object.read do |chunk|
  #Append the most recent chunk and rewind the IO
  io << chunk
  io.rewind
  last_offset = 0
  begin
    while row = csv.shift do
      #Store the parsed row unless we're at the end of a chunk
      unless io.eof?
        last_offset = io.pos
        packer << row.to_hash
      end
    end
  rescue ArgumentError, ::CSV::MalformedCSVError => e
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk
    raise e unless io.eof?
  end
  #Seek to our last offset, create a new StringIO with that partial row & advance the cursor
  io.seek(last_offset)
  io.reopen(io.read)
  io.read
  #Flush our accumulated rows to disk every 1 Meg
  packer.flush if packer.bytes > 1*MEGABYTES 
end
#Read the last row
io.rewind
packer << csv.shift.to_hash
packer
于 2012-11-28T06:50:17.700 回答