我正在尝试通过修改 PageRank 算法的 Spark GraphX 实现来实现带重启的随机游走。
def randomWalkWithRestart(graph: Graph[VertexProperty, EdgeProperty], patientID: String , numIter: Int = 10, alpha: Double = 0.15, tol: Double = 0.01): Unit = {
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => alpha )
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
//logInfo(s"PageRank finished iteration $iteration.")
iteration += 1
我相信这(id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum