0

我在存储在 s3 位置的配置单元中创建了一个表。它有大约 10 列,并按相同的顺序划分为 3 列月、年和城市。

我正在运行一个创建数据帧(20 亿行)并写入该表的 spark 作业。

val partitions:Seq[Column] = Seq(col("month"),col("year"),col("city"))
df.repartition(partitions: _*).selectExpr(cs.map(_.name): _*).write.mode("overwrite").insertInto(s"$tableName")

selectExpr(cs.map(_.name): _*) 对数据框中的列重新排序以与表中的顺序保持一致。

当我运行上述命令插入表时,我看到每个城市下创建了如此多的暂存文件和多个小文件。

s3://s3Root/tableName/month/year/city/file1.csv
                                      file2.csv
                                      ...
                                      file200.csv

我希望每年每月在每个城市下获得一个文件。 合并每个分区。

预期的:

s3://s3Root/tableName/month/year/city/file.csv

任何帮助表示赞赏。

4

1 回答 1

0

要按分区实现一个文件,您应该使用

.partitionBy("")

val partitions:Seq[Column] = Seq(col("month"),col("year"),col("city"))

df.repartition(partitions: _*).selectExpr(cs.map(_.name): _*).write.partitionBy(partitions: _*).mode("overwrite").insertInto(s"$tableName")

我认为你可以避免之前重新分区,如果你只做 partitionBy,文件将被每个分区一个分区。

于 2018-03-12T22:20:30.827 回答