1

我正在尝试将所有火花输出部分文件合并到一个目录中,并在 Scala 中创建一个文件。

这是我的代码:

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

然后在最后一步,我正在编写如下所示的数据帧输出。

dfMainOutputFinalWithoutNull.repartition(10).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("header", "true")
  .option("codec", "gzip")
  .mode("overwrite")
  .save(outputfile)
  merge(mergeFindGlob, mergedFileName )
  dfMainOutputFinalWithoutNull.unpersist()

当我运行它时,我得到以下异常

java.io.FileNotFoundException: File does not exist: hdfs:/user/zeppelin/FinancialLineItem/temp_FinancialLineItem
  at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)

这就是我获得输出的方式

在此处输入图像描述

而不是文件夹,我想合并文件夹内的所有文件并创建一个文件。

4

1 回答 1

0

Hadoop 2 中有一个 copyMerge API: https ://hadoop.apache.org/docs/r2.7.1/api/src-html/org/apache/hadoop/fs/FileUtil.html#line.382

不幸的是,这将在 Hadoop 3.0 中被弃用和删除。

这是 copyMerge 的重新实现(尽管在 PySpark 中)我不得不写,因为我们找不到更好的解决方案: https ://github.com/Tagar/stuff/blob/master/copyMerge.py

希望它也对其他人有所帮助。

于 2017-10-24T21:04:15.787 回答