4

我刚开始使用 Spark 和 Scala

我有一个包含多个文件的目录,我使用成功加载它们

sc.wholeTextFiles(directory)

现在我想更上一层楼。我实际上有一个目录,其中包含包含文件的子目录。我的目标是获得一个,RDD[(String,String)]以便我可以继续前进,其中RDD代表文件的名称和内容。

我尝试了以下方法:

val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))

但我得到了Seq[RDD[(String,String)]] 如何将其Seq转换为RDD[(String,String)]

或者也许我没有做正确的事情,我应该尝试不同的方法?

编辑:添加代码

// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"

// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
//    val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
4

3 回答 3

4

您可以Seq这样减少(将RDDs 与连接++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)

更多细节为什么我们可以在这里应用reduce

  • ++是关联的 - rdda ++ (rddb ++ rddc) 或 (rdda ++ rddb) ++ rddc 无关紧要
  • 假设Seq是非空的(否则fold将是一个更好的选择,它需要一个空RDD[(String, String)]作为初始累加器)。

根据 的确切类型Seq,您可能会得到一个 stackoverflow,所以要小心并使用更大的集合进行测试,尽管对于标准库我认为它是安全的。

于 2014-12-31T15:26:27.573 回答
3

您应该使用unionspark 上下文提供的

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds) 
于 2016-05-05T11:11:03.193 回答
2

您可以使用路径通配符将所有目录加载到单个 RDD 中,而不是将每个目录加载到单独的 RDD 中吗?

鉴于以下目录树...

$ tree test/spark/so
test/spark/so
├── a
│   ├── text1.txt
│   └── text2.txt
└── b
    ├── text1.txt
    └── text2.txt

为目录创建带有通配符的 RDD。

scala> val rdd =  sc.wholeTextFiles("test/spark/so/*/*")
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37

如您所料,计数为 4。

scala> rdd.count
res9: Long = 4

scala> rdd.collect
res10: Array[(String, String)] =
Array((test/spark/so/a/text1.txt,a1
a2
a3), (test/spark/so/a/text2.txt,a3
a4
a5), (test/spark/so/b/text1.txt,b1
b2
b3), (test/spark/so/b/text2.txt,b3
b4
b5))
于 2014-12-31T16:09:56.357 回答