5

我目前正在使用Apache Arrow的 java API(尽管我在 Scala 中使用它作为代码示例)来熟悉这个工具。

作为练习,我选择将 CSV 文件加载到箭头向量中,然后将它们保存到箭头文件中。第一部分似乎很容易,我这样尝试:

val csvLines: Stream[Array[String]] = <open stream from CSV parser>

// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)

// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)

// Work on the data, zipping it with its index 
Stream.from(0)
  .zip(csvLines.tail) // Work on the tail (head contains the headers)
  .foreach(rowTup =>  // rowTup = (index, csvRow as an Array[String])
    Range(0, rowTup._2.size) // Iterate on each column...
      .foreach(columnNumber =>
        writeToMutator(
          mutators(columnNumber), // get that column's mutator
          idx=rowTup._1,          // pass the current row number
          data=rowTup._2(columnNumber) // pass the entry of the curernt column
        )
      )
)

并定义为initVectors()writeToMutator()

def initVectors(
  columns: Array[String], 
  alloc: RootAllocator): Array[NullableVarCharVector] = {

  // Initialize a vector for each column
  val vectors = columns.map(colName => 
    new NullableVarCharVector(colName, alloc))
  // 4096 size, for 1024 values initially. This is arbitrary
  vectors.foreach(_.allocateNew(2^12,1024))
  vectors
}

def writeToMutator(
  mutator: NullableVarCharVector#Mutator, 
  idx: Int, 
  data: String): Unit = {

  // The CSV may contain null values
  if (data != null) {
    val bytes = data.getBytes()
    mutator.setSafe(idx, bytes, 0, bytes.length)
  }
  mutator.setNull(idx)
}

(我目前不关心使用正确的类型,并将所有内容存储为字符串或VarChar箭头的燕鸥)

所以在这一点上,我有一个集合,NullableVarCharVector可以读写它们。在这一点上一切都很好。但是,现在,对于下一步,我想知道如何将它们实际包装在一起并将它们序列化为箭头文件。我偶然发现了一个AbstractFieldWriter抽象类,但不清楚如何使用这些实现。

所以,问题主要是:

  • 将一堆矢量保存到箭头文件的(最好的?-似乎有多个)方法是什么。
  • 还有其他将 CSV 列加载到箭头向量的方法吗?

编辑添加:数据描述页面提供了关于该主题的良好总体概述。

api 的测试类似乎包含一些可以提供帮助的东西,一旦我尝试过,我会发布一个带有示例的回复。

4

1 回答 1

8

查看TestArrowFile.javaBaseFileTest.java我发现:

  • 如何将单个箭头文件写入磁盘
  • 另一种填充向量的方法,因为我的第一次尝试阻止我组装单个箭头文件(或至少以直接的方式这样做)。

因此,填充向量现在看起来像:

// Open stream of rows 
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)

// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName => 
                                  rootWriter.varChar(colName))

Stream.from(0)
  .zip(csvLines.tail) // Zip the rows with their index
  .foreach( rowTup => { // Iterate on each (index, row) tuple
    val (idx, row) = rowTup
      Range(0, row.size) // Iterate on each field of the row
        .foreach(column =>
          Option(row(column)) // row(column) may be null,
            .foreach(str =>   // use the option as a null check
              write(writers(column), idx, allocator, str)
            )
      )
  }
)

toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file

write定义为:

def write(writer: VarCharWriter, idx: Int, 
  allocator: BufferAllocator, data: String): Unit = {
  // Set the position to the correct index
  writer.setPosition(idx)
  val bytes = data.getBytes()
  // Apparently the allocator is required again to build a new buffer
  val varchar = allocator.buffer(bytes.length)
  varchar.setBytes(0, data.getBytes())
  writer.writeVarChar(0, bytes.length, varchar)
}

def toFile(parent: FieldVector, fName: String): Unit = {
  // Extract a schema from the parent: that's the part I struggled with in the original question
  val rootSchema = new VectorSchemaRoot(parent)
  val stream = new FileOutputStream(fName)
  val fileWriter = new ArrowFileWriter(
                        rootSchema,
                        null, // We don't use dictionary encoding.
                        stream.getChannel)
  // Write everything to file...
  fileWriter.start()
  fileWriter.writeBatch()
  fileWriter.end()
  stream.close()
}

有了上面的内容,我可以将 CSV 保存到文件中。我通过阅读并再次将其转换为 CSV 来检查一切顺利,并且内容没有改变。

请注意,ComplexWriterImpl允许写入不同类型的列,这将派上用场,以避免将数字列存储为字符串。

(我现在正在研究事物的阅读方面,这些事情可能值得他们自己的 SO 问题。)

于 2017-10-23T11:11:28.187 回答