我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个 CSV,但不能,它正在创建一个文件夹。
需要一个 Scala 函数,该函数将采用路径和文件名等参数并写入该 CSV 文件。
我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个 CSV,但不能,它正在创建一个文件夹。
需要一个 Scala 函数,该函数将采用路径和文件名等参数并写入该 CSV 文件。
它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果您需要单个输出文件(仍在文件夹中),您可以repartition
(如果上游数据很大,但需要随机播放,则首选):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
或coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
保存前的数据框:
所有数据都将写入mydata.csv/part-00000
. 在使用此选项之前,请确保您了解正在发生的事情以及将所有数据传输到单个工作人员的成本。如果您将分布式文件系统与复制一起使用,数据将被多次传输——首先获取到单个工作人员,然后分布在存储节点上。
或者,您可以保留代码原样,然后使用HDFScat
或HDFSgetmerge
等通用工具简单地合并所有部分。
If you are running Spark with HDFS, I've been solving the problem by writing csv files normally and leveraging HDFS to do the merging. I'm doing that in Spark (1.6) directly:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
Can't remember where I learned this trick, but it might work for you.
我在这里玩游戏可能有点晚了,但是使用coalesce(1)
orrepartition(1)
可能适用于小型数据集,但是大型数据集将全部放入一个节点上的一个分区中。这可能会引发 OOM 错误,或者充其量是处理缓慢。
我强烈建议您使用FileUtil.copyMerge()
Hadoop API 中的函数。这会将输出合并到一个文件中。
编辑- 这有效地将数据带到驱动程序而不是执行程序节点。Coalesce()
如果单个执行程序的 RAM 比驱动程序多,那会很好。
编辑 2:copyMerge()
在 Hadoop 3.0 中被删除。有关如何使用最新版本的更多信息,请参阅以下堆栈溢出文章:如何在 Hadoop 3.0 中进行 CopyMerge?
如果您正在使用 Databricks 并且可以将所有数据放入一名工作人员的 RAM 中(因此可以使用.coalesce(1)
),则可以使用 dbfs 查找并移动生成的 CSV 文件:
val fileprefix= "/mnt/aws/path/file-prefix"
dataset
.coalesce(1)
.write
//.mode("overwrite") // I usually don't use this, but you may want to.
.option("header", "true")
.option("delimiter","\t")
.csv(fileprefix+".tmp")
val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
.filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(partition_path,fileprefix+".tab")
dbutils.fs.rm(fileprefix+".tmp",recurse=true)
如果您的文件不适合工作人员的 RAM,您可能需要考虑 chaotic3quilibrium 使用 FileUtils.copyMerge() 的建议。我还没有这样做,也不知道是否可能,例如在 S3 上。
这个答案是建立在这个问题的先前答案以及我自己对提供的代码片段的测试之上的。我最初将它发布到 Databricks并在这里重新发布。
我找到的关于 dbfs 的 rm 递归选项的最佳文档是在Databricks 论坛上。
spark 的df.write()
API 将在给定路径内创建多个部分文件...强制 spark 只写入单个部分文件使用df.coalesce(1).write.csv(...)
,而不是df.repartition(1).write.csv(...)
因为 coalesce 是一个狭窄的转换,而 repartition 是一个广泛的转换参见Spark - repartition() vs coalesce()
df.coalesce(1).write.csv(filepath,header=True)
part-0001-...-c000.csv
将使用一个文件在给定的文件路径中创建文件夹
cat filepath/part-0001-...-c000.csv > filename_you_want.csv
有一个用户友好的文件名
此答案扩展了已接受的答案,提供了更多上下文,并提供了您可以在机器上的 Spark Shell 中运行的代码片段。
有关已接受答案的更多上下文
接受的答案可能会给您留下示例代码输出单个mydata.csv
文件的印象,但事实并非如此。让我们演示一下:
val df = Seq("one", "two", "three").toDF("num")
df
.repartition(1)
.write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")
这是输出的内容:
Documents/
tmp/
mydata.csv/
_SUCCESS
part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv
NBmydata.csv
是已接受答案中的文件夹 - 它不是文件!
如何输出具有特定名称的单个文件
我们可以使用spark-daria写出单个mydata.csv
文件。
import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = sys.env("HOME") + "/Documents/better/staging",
filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)
这将输出文件如下:
Documents/
better/
mydata.csv
S3 路径
您需要传递 s3a 路径才能DariaWriters.writeSingleFile
在 S3 中使用此方法:
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = "s3a://bucket/data/src",
filename = "s3a://bucket/data/dest/my_cool_file.csv"
)
请参阅此处了解更多信息。
避免复制合并
copyMerge 已从 Hadoop 3 中删除。DariaWriters.writeSingleFile
实现使用fs.rename
,如此处所述。 Spark 3 仍然使用 Hadoop 2,因此 copyMerge 实现将在 2020 年工作。我不确定 Spark 何时会升级到 Hadoop 3,但最好避免在 Spark 升级 Hadoop 时导致代码中断的任何 copyMerge 方法。
源代码
DariaWriters
如果您想检查实现,请在 spark-daria 源代码中 查找对象。
PySpark 实现
使用 PySpark 写出单个文件更容易,因为您可以将 DataFrame 转换为默认情况下作为单个文件写出的 Pandas DataFrame。
from pathlib import Path
home = str(Path.home())
data = [
("jellyfish", "JALYF"),
("li", "L"),
("luisa", "LAS"),
(None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)
限制
DariaWriters.writeSingleFile
Scala 方法和Pythondf.toPandas()
方法仅适用于小型数据集。巨大的数据集不能写成单个文件。从性能的角度来看,将数据作为单个文件写入并不是最佳的,因为数据不能并行写入。
我在 Python 中使用它来获取单个文件:
df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
适用于从 Minkymorgan 修改的 S3 的解决方案。
如果要删除原始目录,只需将临时分区目录路径(名称与最终路径不同)作为srcPath
最终的 csv/txt 作为destPath
指定。deleteSource
/**
* Merges multiple partitions of spark text file output into single file.
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = {
import org.apache.hadoop.fs.FileUtil
import java.net.URI
val config = spark.sparkContext.hadoopConfiguration
val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
FileUtil.copyMerge(
fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
)
}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._
我使用以下方法解决了(hdfs 重命名文件名):-
步骤 1:- (Crate Data Frame 并写入 HDFS)
df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")
第 2 步:-(创建 Hadoop 配置)
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
Step3 :- (在 hdfs 文件夹路径中获取路径)
val pathFiles = new Path("/hdfsfolder/blah/")
Step4:- (从 hdfs 文件夹中获取 spark 文件名)
val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)
setp5:- (创建 scala 可变列表以保存所有文件名并将其添加到列表中)
var fileNamesList = scala.collection.mutable.MutableList[String]()
while (fileNames.hasNext) {
fileNamesList += fileNames.next().getPath.getName
}
println(fileNamesList)
第6步:-(从文件名scala列表中过滤_SUCESS文件顺序)
// get files name which are not _SUCCESS
val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")
第7步:-(将scala列表转换为字符串并将所需文件名添加到hdfs文件夹字符串,然后应用重命名)
val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
在保存之前重新分区/合并到 1 个分区(您仍然会得到一个文件夹,但其中会有一个部分文件)
您可以使用rdd.coalesce(1, true).saveAsTextFile(path)
它将数据作为单个文件存储在 path/part-00000
spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")
spark.sql("select * from df")
--> 这是数据框
coalesce(1)
或repartition(1)
--> 这将使您的输出文件仅成为 1 个部分文件
write
--> 写入数据
option("mode","append")
--> 将数据附加到现有目录
option("header","true")
--> 启用标题
csv("<hdfs dir>")
--> 写入为 CSV 文件及其在 HDFS 中的输出位置
def export_csv(
fileName: String,
filePath: String
) = {
val filePathDestTemp = filePath + ".dir/"
val merstageout_df = spark.sql(merstageout)
merstageout_df
.coalesce(1)
.write
.option("header", "true")
.mode("overwrite")
.csv(filePathDestTemp)
val listFiles = dbutils.fs.ls(filePathDestTemp)
for(subFiles <- listFiles){
val subFiles_name: String = subFiles.name
if (subFiles_name.slice(subFiles_name.length() - 4,subFiles_name.length()) == ".csv") {
dbutils.fs.cp (filePathDestTemp + subFiles_name, filePath + fileName+ ".csv")
dbutils.fs.rm(filePathDestTemp, recurse=true)
}}}
通过使用 Listbuffer,我们可以将数据保存到单个文件中:
import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
val text = spark.read.textFile("filepath")
var data = ListBuffer[String]()
for(line:String <- text.collect()){
data += line
}
val writer = new FileWriter("filepath")
data.foreach(line => writer.write(line.toString+"\n"))
writer.close()
还有另一种使用 Java 的方法
import java.io._
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
try { op(p) }
finally { p.close() }
}
printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}