我正在尝试将数据框保存为本地驱动器中的 CSV 文件。但是,当我这样做时,我会生成一个文件夹,并在该分区中写入文件。有什么建议可以克服这个吗?
我的要求: 获取代码中给出的实际名称的普通 csv 文件。
代码片段:
dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/dataframe.csv")
我正在尝试将数据框保存为本地驱动器中的 CSV 文件。但是,当我这样做时,我会生成一个文件夹,并在该分区中写入文件。有什么建议可以克服这个吗?
我的要求: 获取代码中给出的实际名称的普通 csv 文件。
代码片段:
dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/dataframe.csv")
TL:DR您正在尝试在分布式环境中强制执行顺序的核心概念。结局不可能很好。
Spark 不提供这样的实用程序。为了能够以半分布式方式创建一个,您必须实现多步、源依赖协议,其中:
由于这具有有限的应用程序,仅对小文件有用,并且对于某些源(如对象存储)可能非常昂贵,因此在 Spark 中没有实现这样的功能。
您当然可以收集数据,使用标准 CSV 解析器(Univoicity,Apache Commons),然后放入您选择的存储中。这是顺序的,需要多次数据传输。
没有自动的方法可以做到这一点。我看到两个解决方案
part-*csv
文件移动/重命名为所需的名称但是这两种解决方案都会破坏并行性,从而破坏火花的目标。
这是不可能的,但你可以这样做:
dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/data/")
import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
val filePath = "E:/data/"
val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName
fs.rename(new Path(filePath+fileName), new Path(filePath+"dataframe.csv"))