希望这个 Spark Scala 代码可能会有所帮助。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI
def listFiles(basep: String, globp: String): Seq[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
val fs = FileSystem.get(new URI(basep), conf)
def validated(path: String): Path = {
if(path startsWith "/") new Path(path)
else new Path("/" + path)
}
val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
hadoopConf = conf,
filter = null,
sparkSession = spark)
fileCatalog.flatMap(_._2.map(_.path))
}
val root = "/mnt/{path to your file directory}"
val globp = "[^_]*"
val files = listFiles(root, globp)
val paths=files.toVector
循环向量以读取多个文件:
for (path <- paths) {
print(path.toString)
val df= spark.read.
format("com.crealytics.spark.excel").
option("useHeader", "true").
option("treatEmptyValuesAsNulls", "false").
option("inferSchema", "false").
option("addColorColumns", "false").
load(path.toString)
}