0

我们使用带有滚动卷的 MapR FS,并且有必要将分区输出 parquet 文件与相应的卷对齐。

df
 .write
 .partitionBy("year", "month", "day", "hour")
 .parquet("/data/nfs/{year}/{month}/{day}/datastore")

这个想法是通过自定义输出提交器在运行时解析路径:

class MaprParquetOutputComitter(outputPath: Path, context: TaskAttemptContext)
  extends ParquetOutputCommitter(
    MaprParquetOutputComitter.resolvePath(outputPath), 
    context)

object MaprParquetOutputComitter extends StrictLogging {
  def resolvePath(p: Path): Path = {
    logger.info(p.toString)

    // Retrieve year, month and day from path object
    // and replace placeholders with extracted values

    new Path(resolvedPath)
  }
}

不幸的是,outputPath看起来像这样"/data/nfs/{year}/{month}/{day}/datastore",但不像"/data/nfs/{year}/{month}/{day}/datastore/year=2018/month=6/day=25/hour=8"

有没有办法通过这种方法(覆盖ParquetOutputCommitter)获得包含分区的输出路径?或者,也许,有另一种解决方案来实现这个目标?

4

0 回答 0