2

我在 S3 上有一个目录结构,如下所示:

foo
  |-base
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-A
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-B
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....

这意味着对于目录,我根据作业的时间戳在给定路径中foo有多个输出表、、、、baseAB

在这种情况下,我希望left join他们都基于时间戳和主目录foo。这意味着将每个输出表baseAB等读入left join可以应用 a 的新单独输入表。一切以base表格为起点

像这样的东西(不工作的代码!)

val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*")
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*")

val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left"))

有人可以指出我如何获得该数据帧序列的正确方向吗?甚至可能值得将读取视为惰性或顺序读取,因此仅在应用连接时读取AB表以减少内存需求。

注意:目录结构不是最终的,这意味着如果适合解决方案,它可以更改。

4

2 回答 2

1

据我了解,Spark 使用底层 Hadoop API 来读取数据文件。因此继承的行为是将您指定的所有内容读入一个 RDD/DataFrame。

要实现您想要的,您可以首先获取目录列表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{ FileSystem, Path }

    val path = "foo/"

    val hadoopConf = new Configuration()
    val fs = FileSystem.get(hadoopConf)
    val paths: Array[String] = fs.listStatus(new Path(path)).
      filter(_.isDirectory).
      map(_.getPath.toString)

然后将它们加载到单独的数据框中:

    val dfs: Array[DataFrame] = paths.
      map(path => spark.read.orc(path + "/2017/01/04/*"))
于 2017-02-06T09:49:30.153 回答
0

这是(我认为)您正在尝试做的直接解决方案,不使用 Hive 或内置分区功能等额外功能:

import spark.implicits._

// load base
val baseDF = spark.read.orc("foo/base/2017/01/04").as("base")

// create or use existing Hadoop FileSystem - this should use the actual config and path
val fs = FileSystem.get(new URI("."), new Configuration())

// find all other subfolders under foo/
val otherFolderPaths = fs.listStatus(new Path("foo/"), new PathFilter {
  override def accept(path: Path): Boolean = path.getName != "base"
}).map(_.getPath)

// use foldLeft to join all, using the DF aliases to find the right "id" column
val result = otherFolderPaths.foldLeft(baseDF) { (df, path) =>
  df.join(spark.read.orc(s"$path/2017/01/04").as(path.getName), $"base.id" === $"${path.getName}.id" , "left") }
于 2017-02-06T09:36:16.300 回答