获取与特定顶点相关的连接组件可以使用 BFS 遍历来完成,该遍历从该顶点开始并在几个跃点上收集其所有邻居。这可以通过 GraphX 提供的 Pregel API 简单地完成,我们应该在其中实现一个 vertexProgram、sendMessage 和 mergeMessages 函数。该算法在接收到初始消息时触发。中心向其邻居发送一条消息,该消息将其传播给他们的邻居,依此类推,直到覆盖连接的组件。每个接收到 msg 的顶点都会被检查,这样它就不会在接下来的迭代中被激活。
这是这种方法的实现:
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object ConnectedComponent extends Serializable {
def main(args = Array[String]) = {
val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
val sc = new SparkContext(conf)
val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
val graph = Graph(vRDD, eRDD)
val centerOfCC = graph.pickRandomVertex()
var cc = extractCC(graph, center)
cc.vertices.collect.foreach(println)
sc.stop()
}
def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
/* Return a subgraph of the input graph containing 'center' with the connected component
*/
val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
.subgraph(vpred = (id, attr) => attr.checked == true)
.mapVertices((id, vdata) => vdata.attr)
connectedComponent
}
case class VertexData( var attr : Int, // label of the vertex
var checked : Boolean, // check visited vertices
var propagate : Boolean, // allow forwarding msgs or not
var center: VertexId) // ID of the connectedComponent center
def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {
val attr : Int = vdata.attr
var checked : Boolean = vdata.checked
var propagate : Boolean = vdata.propagate
val center : VertexId = vdata.center
if (checked==false && msg == 0 && id==center) {
propagate = true
checked = true
}
else if(checked==false && msg == 1) {
propagate = true
checked = true
}
else if(checked == true && msg == 1){
propagate = false
}
new VertexData(attr, checked, propagate, center)
}
def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
var it : Iterator[(VertexId, Int)] = Iterator()
if(triplet.dstAttr.propagate==true)
it = it ++ Iterator((triplet.srcId, 1))
if(triplet.srcAttr.propagate==true)
it = it ++ Iterator((triplet.dstId, 1))
it
}
def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}