您需要基于顶点和边的联合创建一个新图,然后使用 groupEdges():
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut
val verts1 = sc.parallelize(Seq(
(1L,"A"),
(2L,"B"),
(3L,"C"),
(4L,"D"),
(5L,"E"),
(6L,"F")))
val edges1 = sc.parallelize(Seq(
Edge(1L,2L,1),
Edge(2L,3L,1),
Edge(3L,4L,1),
Edge(1L,5L,1),
Edge(5L,6L,1)))
val graph1 = Graph(verts1, edges1)
val verts2 = sc.parallelize(Seq(
(1L,"A"),
(2L,"B"),
(3L,"C"),
(7L,"G")))
val edges2 = sc.parallelize(Seq(
Edge(1L,2L,1),
Edge(2L,3L,1),
Edge(1L,7L,1)))
val graph2 = Graph(verts2, edges2)
val graph: Graph[String,Int] = Graph(
graph1.vertices.union(graph2.vertices),
graph1.edges.union(graph2.edges)
).partitionBy(RandomVertexCut).
groupEdges( (attr1, attr2) => attr1 + attr2 )
如果您现在查看这个新图的边缘,您可以看到合并结果:
scala> graph.edges.collect
res0: Array[org.apache.spark.graphx.Edge[Int]] =
Array(Edge(1,2,2), Edge(2,3,2), Edge(1,5,1),
Edge(5,6,1), Edge(1,7,1), Edge(3,4,1))