当我尝试使用 Scala 在 Graphx 中实现算法时,我发现无法在下一次迭代中激活所有顶点。如何向我的所有图形顶点发送消息?在我的算法中,所有顶点都应该执行一些超级步骤(无论它们是否收到消息,因为即使没有收到消息也是应该在下一次迭代中处理的事件)。
我在这里给出了在 pregel 的逻辑中实现的 SSSP 算法的官方代码,你可以看到只有收到消息的顶点才会在下一次迭代中执行它们的程序,但是对于我的情况,我希望 pregel 函数迭代运行,即每个超级步骤顶点执行他们的程序,如果需要他们可以投票停止!这个例子中的推理看起来不像 Pregel 的论文逻辑。请对如何实现 Pregel 的真实逻辑有任何想法?
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
}