0

我将数据拆分为多个文件。我想加载并加入文件。我想构建一个动态函数 1. 将 n 个数据文件连接到一个数据框 2. 给定文件位置和连接列的输入(例如,pk)

我认为这可以用 foldLeft 完成,但我不太确定如何:

到目前为止,这是我的代码:

@throws
def dataJoin(path:String, fileNames:String*): DataFrame=
{
  try
  {
    val dfList:ArrayBuffer[DataFrame]=new ArrayBuffer
    for(fileName <- fileNames)
    {
      val df:DataFrame=DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName")
      dfList += df
    }

    dfList.foldLeft
    {
             (df,df1) => joinDataFrames(df,df1, "UID")
    }
  }
  catch
  {
    case e:Exception => throw new Exception(e)
  }
}


def joinDataFrames(df:DataFrame,df1:DataFrame, joinColum:String): Unit =
{
  df.join(df1, Seq(joinColum))
}
4

1 回答 1

3

foldLeft可能确实适合这里,但它需要一个“零”元素来开始折叠(除了折叠功能)。在这种情况下,“零”可以是第一个 DataFrame:

dfList.tail.foldLeft(dfList.head) { (df1, df2) => df1.join(df2, "UID") }

为避免错误,您可能希望在尝试访问第一项之前确保列表不为空 - 一种方法是使用模式匹配。

dfList match {
  case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
  case Nil => spark.emptyDataFrame
}

最后,映射一个集合而不是迭代它并填充另一个(空的、可变的)集合更简单、更安全、更惯用:

val dfList = fileNames.map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))

共:

def dataJoin(path:String, fileNames: String*): DataFrame = {
  val dfList = fileNames
    .map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
    .toList

  dfList match {
    case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
    case Nil => spark.emptyDataFrame
  }
}
于 2017-10-04T19:34:01.790 回答