我正在尝试通过修改 PageRank 算法的 Spark GraphX 实现来实现带重启的随机游走。
def randomWalkWithRestart(graph: Graph[VertexProperty, EdgeProperty], patientID: String , numIter: Int = 10, alpha: Double = 0.15, tol: Double = 0.01): Unit = {
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => alpha )
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
rankGraph.cache()
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
//logInfo(s"PageRank finished iteration $iteration.")
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)
iteration += 1
}
}
我相信这(id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum
部分应该改变,但我不确定如何改变。我需要将就绪状态概率添加到这一行。
此外,就绪状态概率应该在while
循环之前的某个地方初始化。并且必须在while
循环内上传就绪状态概率。
任何建议,将不胜感激。