3

考虑以下示例

val rdd1 = sc.textFile(...)
val rdd2 = sc.textFile(...)

val a = rdd1.doSomeTransformation
val b = rdd1.doAnotherTransformation 

val c = rdd2.doSomeTransformation
val d = rdd2.doAnotherTransformation 

//nonsense code, just to illustrate that it's all part of a big DAG (or so I think)
val vertices = a.join(b)

val edges = c.join(d) //corrected (thanks Justin!)

val graph = new Graph(vertices, edges) //or something like this 

graph.cache()

graph.triplets.collect() // first "materialization"

graph.triplets.collect() // second "materialization"

我的问题是

如果我不缓存 rdd1 和 rdd2,它们会在“第一次实现”期间重新加载两次吗?

如果我缓存它们,那么它不会复制数据吗?有没有办法临时缓存数据?例如缓存一个分区,直到图被缓存,当图被完全缓存时,然后取消创建它的RDD。那可能吗?

编辑:删除了臃肿的冗长并将问题集中在一个主题上。

4

2 回答 2

1

你是对的,这将运行两次,因为 DAG 是这样的:

a = textFile1->doSomeTransformation
b = textFile1->doAnotherTransformation
c = textFile2->doSomeTransformation
d = textFile2->doAnotherTransformation
vertices = textFile1->doSomeTransformation | textFile1.doAnotherTransformation
edges = textFile2->doSomeTransformation | textFile2.doAnotherTransformation

请注意,是的,存在共性,但 afaik Spark 在加入时不会处理这种情况。SparkSQL 可能在催化剂优化部分......但我很怀疑。造成这种情况的部分原因是数据的隐式缓存可能会弄乱内存存储计算并驱逐您期望在那里的缓存数据。你最好的办法是重写如下:

val rdd1 = sc.textFile(...)
             .cache()
val rdd2 = sc.textFile(...)
             .cache()

val a = rdd1.doSomeTransformation
val b = rdd1.doAnotherTransformation 

val c = rdd2.doSomeTransformation
val d = rdd2.doAnotherTransformation 


val vertices = a.join(b)
val edges = c.join(a)
val graph = new Graph(vertices, edges) //or something like this 
graph.cache()

graph.triplets.collect() // first "materialization"
graph.triplets.collect() // second "materialization"

rdd1.unpersist()
rdd2.unpersist()

我会仔细检查,但不应该有你担心的双重缓存。将graph.cache搭载textFile缓存。

虽然,现在我可以专注于你没有链接,而是执行不同的计算,这是一个有趣的想法,可以在配置或其他东西中打开。但是,这样的功能有很多极端情况(它是否只针对该 DAG 持续存在,还是应该意识到将来可能会进行调用?)。它必须是这样的:spark.optimization.cacheDAGCommonalities.

话虽如此,如果 anRDD是“热的”,我已经看到它在后续请求中急剧下降(即textFile1需要 10 分钟,但下一次迭代只需要 3-4 分钟)

于 2015-03-10T04:20:17.033 回答
0

不幸的是,它似乎确实加载了 rdd1 和 rdd2 两次。我希望它不会(评论者真的对我寄予厚望,感谢 Soumya 提到狭窄的依赖关系,我会尝试看看我是否可以重构我的代码以以某种方式利用这一点)。我认为这可能是未来版本的 Spark 将优化以消除双重加载,但目前似乎没有这样做。

这是一个证明它的简单实验:(theTriMap和 theAtomicInteger仅用于说明目的,因为它在本地运行,它不会在集群 AFAIK 上工作,即使两者都是 Serializable :),无论如何,很容易看到没有它,文件会加载两次。这只是顶部的樱桃,但只看到printlns 显示每个文件 rdd 被计算两次)

解释我们所看到的。这只是对问题中代码的详细说明。我创建了 2 个文件 RDD,对它们进行分支转换,(在每个上映射,然后加入等),然后构建一个基于这些 RDD 构建的 Graph,缓存它(默认情况下它是缓存的,但添加了一个显式调用只是为了使它更具可读性)

然后我调用graph.triplets.collectwhich 加载整个 RDD DAG。

(环境:Spark 1.2.1 Scala 2.11.5 Windows 7 64 位)

我使用的文件很小,只有2个分区,所以println显示每个文件加载了两次(我们看到每个分区+索引组合出现了两次)

**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1

它应该看起来像这样

**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1


完整的测试代码:

// scalastyle:off

import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext._
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.{InterruptibleIterator, Partition, SerializableWritable, SparkContext, TaskContext}

import scala.collection.concurrent.TrieMap

object CacheTest {

  // I think this only works when running locally ;) but still helps prove the point
  val numFileWasRead = TrieMap[String, AtomicInteger]()

  def main(args: Array[String]) {
    Logger.getRootLogger.setLevel(Level.WARN)

    val sc = new SparkContext("local[4]", "Cache Test") {
      override def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] =
        super.textFile(path, minPartitions)


      override def hadoopFile[K, V](
                                     path: String,
                                     inputFormatClass: Class[_ <: InputFormat[K, V]],
                                     keyClass: Class[K],
                                     valueClass: Class[V],
                                     minPartitions: Int = defaultMinPartitions
                                     ): RDD[(K, V)] = {

        // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
        val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
        val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions) {
          override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {

            val index = theSplit.index
            if(index == 0) {
              numFileWasRead.getOrElseUpdate(path, new AtomicInteger(0)).incrementAndGet()
            }
            println(s"\r**** read file: path: $path partition index: $index")
            val bytesRead = context.taskMetrics().inputMetrics.map(metrics =>
              println(metrics.bytesRead))

            super.compute(theSplit, context)
          }
        }.setName(path)
      }
    }
    val vFileName = "c:/temp/vertices.txt"
    val eFileName = "c:/temp/edges.txt"
    val rdd1 = sc.textFile(vFileName)
    val rdd2 = sc.textFile(eFileName)

    val a = rdd1.map(x => {
      val xLong = x.toLong
      xLong -> xLong * 2
    })

    val b = rdd1.map(x => {
      val xLong = x.toLong
      xLong -> xLong * 2
    })

    val c = for {
      row <- rdd2
      Array(left, _) = row.split(" ")
    } yield {
      left.toLong
    }

    sc.setJobGroup("mapping rdd2 to d", "")

    val d = for {
      row <- rdd2
      Array(_, right) = row.split(" ")
    } yield {
      right.toLong
    }

    val vertices = a.join(b).map(x => x._1 -> "foo")

    val edges = c zip d map {
      case (left, right) => Edge(left, right, "N/A")
    }
    val graph = Graph(vertices, edges) // graph is automatically caching vertices and edges
    graph.cache() //these is a futile call, just in case you don't believe me (look at Graph's source...)

    val rdds = List[RDD[_]](rdd1,    rdd2,   a,   b,   c,   d,   vertices,   edges,   graph.vertices,   graph.edges,  graph.triplets)
    val rddsNames =    List("rdd1", "rdd2", "a", "b", "c", "d", "vertices", "edges", "graph.vertices", "graph.edges", "graph.triplets")
    val rddNameById = (rdds zip rddsNames).map(x => x._1.id -> x._2).toMap

    def printCachedInformation(intro: String): Unit = {
      println("\n\n" + intro.toUpperCase + "\n\n")
      def displayRDDName(id: Int): String = {
        rddNameById.getOrElse(id, "N/A") + s"(" + id + ")"
      }
      println("sc.getPersistentRDDs: \n" + sc.getPersistentRDDs.map(x => {
        val id = x._1
        displayRDDName(id) -> x._2
      }).mkString("\n"))
      val storageInfo = sc.getRDDStorageInfo
      val storageInfoString = if (storageInfo.isEmpty) " Empty "
      else storageInfo.map(x => {
        val id = x.id
        displayRDDName(id) -> x
      }).mkString("\n")
      println("sc.getRDDStorageInfo: \n" + storageInfoString)
    }

    printCachedInformation("before collect")
    println("\n\nCOLLECTING...\n\n")
    graph.triplets.collect()
    printCachedInformation("after collect")
    //subsequent calls to collect will take it from the Graph's cache so no point in continuing

    println("\n\nSUMMARY\n\n")

    for((file, timesRead) <- numFileWasRead) {
      println(s"file: $file was read ${timesRead.get()} times")
    }

  }
}

输出


收集前


sc.getPersistentRDDs:
(N/A(23),VertexRDD, VertexRDD ZippedPartitionsRDD2[23] at zipPartitions at VertexRDD.scala:296)
(不适用(26),EdgeRDD MapPartitionsRDD[26] at mapPartitions at EdgeRDDImpl.scala:108)
(不适用(16),EdgeRDD,EdgeRDD MapPartitionsRDD[16] at mapPartitionsWithIndex at EdgeRDD.scala:104)
sc.getRDDStorageInfo:
 空的


正在收集...


**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:1
**** 读取文件:路径:c:/temp/vertices.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1
**** 读取文件:路径:c:/temp/edges.txt 分区索引:0
**** 读取文件:路径:c:/temp/edges.txt 分区索引:1


收集后


sc.getPersistentRDDs:
(N/A(23),VertexRDD, VertexRDD ZippedPartitionsRDD2[23] at zipPartitions at VertexRDD.scala:296)
(不适用(26),EdgeRDD MapPartitionsRDD[26] at mapPartitions at EdgeRDDImpl.scala:108)
(不适用(16),EdgeRDD,EdgeRDD MapPartitionsRDD[16] at mapPartitionsWithIndex at EdgeRDD.scala:104)
sc.getRDDStorageInfo:
(N/A(23),RDD "VertexRDD, VertexRDD" (23) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 3.0 KB; TachyonSize: 0.0 B;磁盘大小:0.0 B)
(N/A(26),RDD "EdgeRDD" (26) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 5.5 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B)
(N/A(16),RDD "EdgeRDD, EdgeRDD" (16) StorageLevel: StorageLevel(false, true, false, true, 1); CachedPartitions: 2; TotalPartitions: 2; MemorySize: 5.5 KB; TachyonSize: 0.0 B;磁盘大小:0.0 B)


概括


文件:c:/temp/edges.txt 被读取了 2 次
文件:c:/temp/vertices.txt 被读取了 2 次

进程以退出代码 0 结束

输入

边缘.txt

1 2
2 3
3 4
4 1
2 5
5 6
1 3
3 6
6 1
1 7
7 8
8 4
8 9
9 10
10 11
11 12
12 9
12 13
13 14
14 15
15 16
16 17
17 18
18 19
19 20
20 18
19 17

顶点.txt

1
2
3
4
2
5
1
3
6
1
7
8
8
9
10
11
12
12
13
14
15
16
17
18
19
20
19
于 2015-03-10T01:34:24.867 回答