0

我正在将 spark 工作从私有数据中心迁移到 GCP。早些时候,我使用 hadoop.fs 中的 FileSystem 在 maprfs 中传输和删除文件。但是对于 GCP 存储桶,这是行不通的。我必须将文件从目录 A 传输到同一 GCP 存储桶中的目录 B,然后在传输后从目录 A 中删除文件。以下两种方法在 GCP 中不起作用。您能否建议 scala 中支持传输和删除谷歌云存储桶中文件的任何其他库。

删除方法:

import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def deleteFiles(sc: SparkContext, reingestHdfsFiles: String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
try {
  fs = path.getFileSystem(sc.hadoopConfiguration)
  for (file <- files) {
    val p = new Path(file)
    if (fs.exists(p)) {
      fs.delete(p, true)
      println("Deleted: " + file)
    } else {
      println("Error, Unable to delete:" + file)
    }
  }
} catch {
  case e: Exception => println("Exception while deleting files");
} finally {
  if (fs != null) fs.close()
}}

文件传输方式:

import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def moveFiles(sc: SparkContext, hdfsFiles: String, toLoc:String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
var fu: FileUtil = null
try {
  fs = path.getFileSystem(sc.hadoopConfiguration)
  if(!fs.exists(new Path(toLoc))) fs.mkdirs(new Path(toLoc))
  for (file <- files) {
    val p = new Path(file)
    if (fs.exists(p)) {
      //println("Move successful: " +  fs.rename(p, new Path(toLoc + file.substring(file.lastIndexOf('/') + 1)+ '/')))
      FileUtil.copy(srcFS, src, dsdtFS, dst, false, sc.hadoopConfiguration)
    } else {
      println("Error, File does not exist:" + file)
    }
  }
} catch {
  case e: Exception => println("Exception while moving files");
} finally {
  if (fs != null) fs.close()
}}
4

0 回答 0