0

我有一个 spark 工作,我有一个巨大的文件作为输出 300 gb 到 S3 。我的要求是重命名所有零件文件,然后我们必须移动到最终文件夹。

我进行了研究,但找不到解决方案,我可以在 spark 作业本身中重命名我的 spark 输出文件。

所以我想出了一个计划,从 S3 读回 spark 输出文件,然后再次重命名,然后在 S3 文件夹中再次写回。

但是我的 Spark 作业需要 25 分钟才能完成,但在 S3 中再次阅读、重命名和复制需要 45 分钟。

这对我来说太令人沮丧了。

无论如何我可以使这个过程更快吗?问题是在 spark 作业之后,此过程仅在核心节点上运行,因此需要很长时间。

这就是我所做的。

 val file = fs.globStatus(new Path(outputFileURL + "/*/*/*"))
for (urlStatus <- file) {

      val DataPartitionName = urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
      val StatementTypeCode = urlStatus.getPath.toString.split("=")(2).split("\\/")(0).toString

      val finalFileName = finalPrefix + DataPartitionName + "." + StatementTypeCode+ "."  + fileVersion + currentTime + fileExtention
      val dest = new Path(mainFileURL + "/" + finalFileName)
      fs.rename(urlStatus.getPath, dest)

    }
    println("File renamed and moved to dir now delete output folder")
    myUtil.Utility.DeleteOuptuFolder(fs, outputFileURL)

有没有办法利用以下两个选项

  1. S3 DIST CP 命令 ? 据我研究,我没有发现在 S3 dist CP 中重命名文件。我正在根据文件路径进行重命名。

  2. 我可以使用 shell 命令活动来读取重命名和复制吗?

4

1 回答 1

1

问题是 S3 重命名实际上是作为复制和删除来实现的,所以如果你有很多大文件,它会花费更长的时间。

我建议用 spark 写入 HDFS,然后在 HDFS 上本地执行文件名操作,在其中您实际上具有原子重命名语义,然后使用 S3DistCp 将现在正确命名的文件复制到目标位置,然后删除文件如果您需要空间,请使用 HDFS。

于 2018-04-11T07:23:30.190 回答