不幸的是,它似乎确实加载了 rdd1 和 rdd2 两次。我希望它不会(评论者真的对我寄予厚望,感谢 Soumya 提到狭窄的依赖关系,我会尝试看看我是否可以重构我的代码以以某种方式利用这一点)。我认为这可能是未来版本的 Spark 将优化以消除双重加载,但目前似乎没有这样做。
这是一个证明它的简单实验:(theTriMap
和 theAtomicInteger
仅用于说明目的,因为它在本地运行,它不会在集群 AFAIK 上工作,即使两者都是 Serializable :),无论如何,很容易看到没有它,文件会加载两次。这只是顶部的樱桃,但只看到println
s 显示每个文件 rdd 被计算两次)
解释我们所看到的。这只是对问题中代码的详细说明。我创建了 2 个文件 RDD,对它们进行分支转换,(在每个上映射,然后加入等),然后构建一个基于这些 RDD 构建的 Graph,缓存它(默认情况下它是缓存的,但添加了一个显式调用只是为了使它更具可读性)
然后我调用graph.triplets.collect
which 加载整个 RDD DAG。
(环境:Spark 1.2.1 Scala 2.11.5 Windows 7 64 位)
我使用的文件很小,只有2个分区,所以println显示每个文件加载了两次(我们看到每个分区+索引组合出现了两次)
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
它应该看起来像这样
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
完整的测试代码:
// scalastyle:off
import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext._
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.{InterruptibleIterator, Partition, SerializableWritable, SparkContext, TaskContext}
import scala.collection.concurrent.TrieMap
object CacheTest {
// I think this only works when running locally ;) but still helps prove the point
val numFileWasRead = TrieMap[String, AtomicInteger]()
def main(args: Array[String]) {
Logger.getRootLogger.setLevel(Level.WARN)
val sc = new SparkContext("local[4]", "Cache Test") {
override def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] =
super.textFile(path, minPartitions)
override def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions) {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val index = theSplit.index
if(index == 0) {
numFileWasRead.getOrElseUpdate(path, new AtomicInteger(0)).incrementAndGet()
}
println(s"\r**** read file: path: $path partition index: $index")
val bytesRead = context.taskMetrics().inputMetrics.map(metrics =>
println(metrics.bytesRead))
super.compute(theSplit, context)
}
}.setName(path)
}
}
val vFileName = "c:/temp/vertices.txt"
val eFileName = "c:/temp/edges.txt"
val rdd1 = sc.textFile(vFileName)
val rdd2 = sc.textFile(eFileName)
val a = rdd1.map(x => {
val xLong = x.toLong
xLong -> xLong * 2
})
val b = rdd1.map(x => {
val xLong = x.toLong
xLong -> xLong * 2
})
val c = for {
row <- rdd2
Array(left, _) = row.split(" ")
} yield {
left.toLong
}
sc.setJobGroup("mapping rdd2 to d", "")
val d = for {
row <- rdd2
Array(_, right) = row.split(" ")
} yield {
right.toLong
}
val vertices = a.join(b).map(x => x._1 -> "foo")
val edges = c zip d map {
case (left, right) => Edge(left, right, "N/A")
}
val graph = Graph(vertices, edges) // graph is automatically caching vertices and edges
graph.cache() //these is a futile call, just in case you don't believe me (look at Graph's source...)
val rdds = List[RDD[_]](rdd1, rdd2, a, b, c, d, vertices, edges, graph.vertices, graph.edges, graph.triplets)
val rddsNames = List("rdd1", "rdd2", "a", "b", "c", "d", "vertices", "edges", "graph.vertices", "graph.edges", "graph.triplets")
val rddNameById = (rdds zip rddsNames).map(x => x._1.id -> x._2).toMap
def printCachedInformation(intro: String): Unit = {
println("\n\n" + intro.toUpperCase + "\n\n")
def displayRDDName(id: Int): String = {
rddNameById.getOrElse(id, "N/A") + s"(" + id + ")"
}
println("sc.getPersistentRDDs: \n" + sc.getPersistentRDDs.map(x => {
val id = x._1
displayRDDName(id) -> x._2
}).mkString("\n"))
val storageInfo = sc.getRDDStorageInfo
val storageInfoString = if (storageInfo.isEmpty) " Empty "
else storageInfo.map(x => {
val id = x.id
displayRDDName(id) -> x
}).mkString("\n")
println("sc.getRDDStorageInfo: \n" + storageInfoString)
}
printCachedInformation("before collect")
println("\n\nCOLLECTING...\n\n")
graph.triplets.collect()
printCachedInformation("after collect")
//subsequent calls to collect will take it from the Graph's cache so no point in continuing
println("\n\nSUMMARY\n\n")
for((file, timesRead) <- numFileWasRead) {
println(s"file: $file was read ${timesRead.get()} times")
}
}
}
输出
收集前
sc.getPersistentRDDs:
(N/A(23),VertexRDD, VertexRDD ZippedPartitionsRDD2[23] at zipPartitions at VertexRDD.scala:296)
(不适用(26),EdgeRDD MapPartitionsRDD[26] at mapPartitions at EdgeRDDImpl.scala:108)
(不适用(16),EdgeRDD,EdgeRDD MapPartitionsRDD[16] at mapPartitionsWithIndex at EdgeRDD.scala:104)
sc.getRDDStorageInfo:
空的
正在收集...
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
收集后
sc.getPersistentRDDs:
(N/A(23),VertexRDD, VertexRDD ZippedPartitionsRDD2[23] at zipPartitions at VertexRDD.scala:296)
(不适用(26),EdgeRDD MapPartitionsRDD[26] at mapPartitions at EdgeRDDImpl.scala:108)
(不适用(16),EdgeRDD,EdgeRDD MapPartitionsRDD[16] at mapPartitionsWithIndex at EdgeRDD.scala:104)
sc.getRDDStorageInfo:
(N/A(23),RDD "VertexRDD, VertexRDD" (23) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 3.0 KB; TachyonSize: 0.0 B;磁盘大小:0.0 B)
(N/A(26),RDD "EdgeRDD" (26) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 5.5 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B)
(N/A(16),RDD "EdgeRDD, EdgeRDD" (16) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 5.5 KB; TachyonSize: 0.0 B;磁盘大小:0.0 B)
概括
文件:c:/temp/edges.txt 被读取了 2 次
文件:c:/temp/vertices.txt 被读取了 2 次
进程以退出代码 0 结束
输入
边缘.txt
1 2
2 3
3 4
4 1
2 5
5 6
1 3
3 6
6 1
1 7
7 8
8 4
8 9
9 10
10 11
11 12
12 9
12 13
13 14
14 15
15 16
16 17
17 18
18 19
19 20
20 18
19 17
顶点.txt
1
2
3
4
2
5
1
3
6
1
7
8
8
9
10
11
12
12
13
14
15
16
17
18
19
20
19