我刚开始使用 Spark 和 Scala
我有一个包含多个文件的目录,我使用成功加载它们
sc.wholeTextFiles(directory)
现在我想更上一层楼。我实际上有一个目录,其中包含包含文件的子目录。我的目标是获得一个,RDD[(String,String)]
以便我可以继续前进,其中RDD
代表文件的名称和内容。
我尝试了以下方法:
val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))
但我得到了Seq[RDD[(String,String)]]
如何将其Seq
转换为RDD[(String,String)]
?
或者也许我没有做正确的事情,我应该尝试不同的方法?
编辑:添加代码
// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"
// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
// val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)
// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)