4

我们的任务是使用 Pregel API 为 3lac 顶点找到最短路径。我们应该将每个顶点作为源顶点一次,并确定所有这些执行中的最短路径。我的代码如下所示,

def shortestPath(sc: SparkContext, mainGraph: Graph[(String, String, Double), Double], singleSourceVertexFlag: Boolean) {

var noOfIterations = mainGraph.vertices.count();
// If single source vertext is true, pass only count as one iteration only
if (singleSourceVertexFlag) {
  noOfIterations = 1
} else { // else loop through complete list of vertices
  noOfIterations = mainGraph.vertices.count()
}

for (i <- 0 to (noOfIterations.toInt - 1)) {
  val sourceId: VertexId = i
  val modGraph = mainGraph.mapVertices((id, attr) =>
    if (id == sourceId) (0.0)
    else (Double.PositiveInfinity))

  val loopItrCount = modGraph.vertices.count().toInt;
  val sssp = modGraph.pregel(Double.PositiveInfinity, loopItrCount, EdgeDirection.Out)(

    (id, dist, newDist) =>
      if (dist < newDist) dist
      else newDist, // Vertex Program

    triplet => { // Send Message
      if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
      } else {
        Iterator.empty
      }
    },

    (a, b) =>
      if (a < b) a // Merge Message
      else b)

  sssp.unpersist(true)
  modGraph.unpersist(true)

  println("****Shortest Path End**** SourceId" + sourceId)

}

}

从这段代码中,我必须从每个循环中读取最短路径,并从中确定最小值作为最终输出(这是未来的部分,我还没有编写相同的代码)。

现在这个当前代码适用于 15 节点图和 1112 节点图。但是当我尝试为 22k 节点图执行算法时,该算法针对 55 个源节点执行,然后因内存不足错误而停止。我们有一个两节点集群(1node - 64GB RAM,2node - 32GB RAM)

问题是,
1. 在 Spark 集群上如何处理 for 循环?我必须在代码中修改什么以优化代码吗?
2. 我正在尝试使用 unpersist 以便在每个循环中清除 RDD 并为每个循环创建新的。但是在执行 55 个节点后我仍然内存不足。应该怎么做才能对所有节点执行相同的操作?

4

0 回答 0