我正在将 spark 工作从私有数据中心迁移到 GCP。早些时候,我使用 hadoop.fs 中的 FileSystem 在 maprfs 中传输和删除文件。但是对于 GCP 存储桶,这是行不通的。我必须将文件从目录 A 传输到同一 GCP 存储桶中的目录 B,然后在传输后从目录 A 中删除文件。以下两种方法在 GCP 中不起作用。您能否建议 scala 中支持传输和删除谷歌云存储桶中文件的任何其他库。
删除方法:
import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def deleteFiles(sc: SparkContext, reingestHdfsFiles: String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
try {
fs = path.getFileSystem(sc.hadoopConfiguration)
for (file <- files) {
val p = new Path(file)
if (fs.exists(p)) {
fs.delete(p, true)
println("Deleted: " + file)
} else {
println("Error, Unable to delete:" + file)
}
}
} catch {
case e: Exception => println("Exception while deleting files");
} finally {
if (fs != null) fs.close()
}}
文件传输方式:
import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileSystem, FileUtil}
def moveFiles(sc: SparkContext, hdfsFiles: String, toLoc:String): Unit = {
val files = hdfsFiles.split(FILE_SEPERATOR)
var fs: FileSystem = null
var fu: FileUtil = null
try {
fs = path.getFileSystem(sc.hadoopConfiguration)
if(!fs.exists(new Path(toLoc))) fs.mkdirs(new Path(toLoc))
for (file <- files) {
val p = new Path(file)
if (fs.exists(p)) {
//println("Move successful: " + fs.rename(p, new Path(toLoc + file.substring(file.lastIndexOf('/') + 1)+ '/')))
FileUtil.copy(srcFS, src, dsdtFS, dst, false, sc.hadoopConfiguration)
} else {
println("Error, File does not exist:" + file)
}
}
} catch {
case e: Exception => println("Exception while moving files");
} finally {
if (fs != null) fs.close()
}}