0

我有以下数据框:

+-----+-------+-----+------+
|city1|  city2| year| month|
+-----+-------+-----+------+
|  AAA|    QQQ| 2019|     2|
|  BBB|    WWW| 2018|     5|
|  CCC|    RRR| 2019|     2|
|  DDD|    EEE| 2019|     7|
+-----+-------+-----+------+

我想将输出作为 CSV 写入按年和月分区的文件夹中。例如,上述案例将有 3 个文件夹路径,如下所示:

basepath/year=2018/month=5   (with 1 record)
basepath/year=2019/month=2   (with 2 records)
basepath/year=2019/month=7   (with 1 record)

不幸的是,我必须使用Spark 1.5.0with Scalawhich 没有函数可以根据所需列中的唯一值轻松写入这些分区。

到目前为止,我有以下代码:

// Assuming df as the above example

val uniqueYears = df.select("year").distinct().map(_ getString(0)).collect.toList
val uniqueMonths = df.select("month").distinct().map(_ getString(0)).collect.toList

for (year <- uniqueYears){
  for (month <- uniqueMonths){
  val outputDf = df.filter((df("year") === year) &&
                           (df("month") === month))
  val outputPath = s"${basePath}/departure_year=${year}/departure_month=${month}"

  outputDf.write.format("com.databricks.spark.csv")
          .option("delimiter", "^")
          .option("nullValue", "")
          .option("treatEmptyValuesAsNulls", "false")
          .save(outputPath)
  }
}

这里的问题是,我看到输出写入是按顺序发生的,即它从循环中的第一年开始,完成其中的所有月份,然后跳到下一年,依此类推。这给作业增加了不必要的运行时间,因为我看到写入每个分区需要几乎完全相同的时间,即使数据在分区之间分布不均。

有什么方法可以并行化此代码,以便将 CSV 并行而不是顺序写入这些文件夹?

请注意,我不能使用其他版本的 Spark,除了1.5.0

4

0 回答 0