我有以下数据框:
+-----+-------+-----+------+
|city1| city2| year| month|
+-----+-------+-----+------+
| AAA| QQQ| 2019| 2|
| BBB| WWW| 2018| 5|
| CCC| RRR| 2019| 2|
| DDD| EEE| 2019| 7|
+-----+-------+-----+------+
我想将输出作为 CSV 写入按年和月分区的文件夹中。例如,上述案例将有 3 个文件夹路径,如下所示:
basepath/year=2018/month=5 (with 1 record)
basepath/year=2019/month=2 (with 2 records)
basepath/year=2019/month=7 (with 1 record)
不幸的是,我必须使用Spark 1.5.0
with Scala
which 没有函数可以根据所需列中的唯一值轻松写入这些分区。
到目前为止,我有以下代码:
// Assuming df as the above example
val uniqueYears = df.select("year").distinct().map(_ getString(0)).collect.toList
val uniqueMonths = df.select("month").distinct().map(_ getString(0)).collect.toList
for (year <- uniqueYears){
for (month <- uniqueMonths){
val outputDf = df.filter((df("year") === year) &&
(df("month") === month))
val outputPath = s"${basePath}/departure_year=${year}/departure_month=${month}"
outputDf.write.format("com.databricks.spark.csv")
.option("delimiter", "^")
.option("nullValue", "")
.option("treatEmptyValuesAsNulls", "false")
.save(outputPath)
}
}
这里的问题是,我看到输出写入是按顺序发生的,即它从循环中的第一年开始,完成其中的所有月份,然后跳到下一年,依此类推。这给作业增加了不必要的运行时间,因为我看到写入每个分区需要几乎完全相同的时间,即使数据在分区之间分布不均。
有什么方法可以并行化此代码,以便将 CSV 并行而不是顺序写入这些文件夹?
请注意,我不能使用其他版本的 Spark,除了1.5.0