我正在尝试将所有火花输出部分文件合并到一个目录中,并在 Scala 中创建一个文件。
这是我的代码:
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
然后在最后一步,我正在编写如下所示的数据帧输出。
dfMainOutputFinalWithoutNull.repartition(10).write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("header", "true")
.option("codec", "gzip")
.mode("overwrite")
.save(outputfile)
merge(mergeFindGlob, mergedFileName )
dfMainOutputFinalWithoutNull.unpersist()
当我运行它时,我得到以下异常
java.io.FileNotFoundException: File does not exist: hdfs:/user/zeppelin/FinancialLineItem/temp_FinancialLineItem
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
这就是我获得输出的方式
而不是文件夹,我想合并文件夹内的所有文件并创建一个文件。