我们的任务是使用 Pregel API 为 3lac 顶点找到最短路径。我们应该将每个顶点作为源顶点一次,并确定所有这些执行中的最短路径。我的代码如下所示,
def shortestPath(sc: SparkContext, mainGraph: Graph[(String, String, Double), Double], singleSourceVertexFlag: Boolean) {
var noOfIterations = mainGraph.vertices.count();
// If single source vertext is true, pass only count as one iteration only
if (singleSourceVertexFlag) {
noOfIterations = 1
} else { // else loop through complete list of vertices
noOfIterations = mainGraph.vertices.count()
}
for (i <- 0 to (noOfIterations.toInt - 1)) {
val sourceId: VertexId = i
val modGraph = mainGraph.mapVertices((id, attr) =>
if (id == sourceId) (0.0)
else (Double.PositiveInfinity))
val loopItrCount = modGraph.vertices.count().toInt;
val sssp = modGraph.pregel(Double.PositiveInfinity, loopItrCount, EdgeDirection.Out)(
(id, dist, newDist) =>
if (dist < newDist) dist
else newDist, // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) =>
if (a < b) a // Merge Message
else b)
sssp.unpersist(true)
modGraph.unpersist(true)
println("****Shortest Path End**** SourceId" + sourceId)
}
}
从这段代码中,我必须从每个循环中读取最短路径,并从中确定最小值作为最终输出(这是未来的部分,我还没有编写相同的代码)。
现在这个当前代码适用于 15 节点图和 1112 节点图。但是当我尝试为 22k 节点图执行算法时,该算法针对 55 个源节点执行,然后因内存不足错误而停止。我们有一个两节点集群(1node - 64GB RAM,2node - 32GB RAM)
问题是,
1. 在 Spark 集群上如何处理 for 循环?我必须在代码中修改什么以优化代码吗?
2. 我正在尝试使用 unpersist 以便在每个循环中清除 RDD 并为每个循环创建新的。但是在执行 55 个节点后我仍然内存不足。应该怎么做才能对所有节点执行相同的操作?