2

我正在尝试通过修改 PageRank 算法的 Spark GraphX 实现来实现带重启的随机游走。

  def randomWalkWithRestart(graph: Graph[VertexProperty, EdgeProperty], patientID: String , numIter: Int = 10, alpha: Double = 0.15, tol: Double = 0.01): Unit = {

var rankGraph: Graph[Double, Double] = graph
  // Associate the degree with each vertex
  .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
  // Set the weight on the edges based on the degree
  .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
  // Set the vertex attributes to the initial pagerank values
  .mapVertices( (id, attr) => alpha )

var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
  rankGraph.cache()

  // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
  // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
  val rankUpdates = rankGraph.aggregateMessages[Double](
    ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)

  // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
  // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
  // edge partitions.
  prevRankGraph = rankGraph
  rankGraph = rankGraph.joinVertices(rankUpdates) {
    (id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum
  }.cache()

  rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
  //logInfo(s"PageRank finished iteration $iteration.")
  prevRankGraph.vertices.unpersist(false)
  prevRankGraph.edges.unpersist(false)

  iteration += 1

}

}

我相信这(id, oldRank, msgSum) => alpha + (1.0 - alpha) * msgSum部分应该改变,但我不确定如何改变。我需要将就绪状态概率添加到这一行。

此外,就绪状态概率应该在while循环之前的某个地方初始化。并且必须在while循环内上传就绪状态概率。

任何建议,将不胜感激。

4

0 回答 0