0

这个问题是关于 Spark GraphX 的。给定一个任意图,我想计算一个新图,它在任意两个节点 v、w 之间添加边,这两个节点都由某个节点 x 指向。新边应包含指向节点作为属性。

也就是说,给定边 (x, v, nil) 和 (x, w, nil) 计算边 (v, w, x) 和 (w, v, x)。

它应该适用于任何图形,并且不需要我事先了解有关图形的任何信息,例如顶点 ID。

例子

[任务] 当被同一节点(例如B)指向时,在节点(例如A,C)之间添加两条有向边。

输入图:

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐            ┌────┐
 │ A  │            │ C  │
 └────┘            └────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

输出图(双向边 = 两条有向边):

          ┌────┐
    ┌─────│ B  │──────┐
    │     └────┘      │
    v                 v
 ┌────┐<───by B───>┌────┐
 │ A  │            │ C  │
 └────┘<───by D───>└────┘
    ^                 ^
    │     ┌────┐      │
    └─────│ D  │──────┘
          └────┘

如何优雅地编写返回输出图的 GraphX 查询?

4

1 回答 1

0

这是一个使用 pregel 和聚合消息的解决方案

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Step 0: Create an input graph.
val nodes =
  sc.parallelize(Array(
    (101L, "A"), (102L, "A"), (201L, "B"), (202L, "B")
  ))
val edges = 
  sc.parallelize(Array(
    Edge(201L, 101L, ("B-to-A", 0L)), Edge(201L, 102L, ("B-to-A", 0L)),
    Edge(202L, 101L, ("B-to-A", 0L)), Edge(202L, 102L, ("B-to-A", 0L))
  ))    
val graph = Graph(nodes, edges, "default")

// Step 1: Transform input graph before running pregel.
val initialGraph = graph.mapVertices((id, _) => Set[(VertexId,VertexId)]())

// Step 2: Send downstream vertex IDs (A's) to upstream vertices (B's)
val round1 = initialGraph.pregel(
  initialMsg=Set[(VertexId,VertexId)](), 
  maxIterations=1, 
  activeDirection=EdgeDirection.In) 
(
  (id, old, msg) => old.union(msg),
  triplet => Iterator((triplet.srcId, Set((triplet.dstId, triplet.srcId)))),
  (a,b) => a.union(b)
)

// Step 3: Send (gathered) IDs back to downstream vertices
val round2 = round1.aggregateMessages[Set[(VertexId,VertexId)]](
  triplet => {
    triplet.sendToDst(triplet.srcAttr)
  },
  (a, b) => a.union(b)
)

// Step 4: Transform vertices to edges
val newEdges = round2.flatMap {v => v._2.filter(w => w._1 != v._1).map(w => Edge(v._1, w._1, ("shares-with", w._2)))}

// Step 5: Create a new graph that contains new edges
val newGraph = Graph(graph.vertices, graph.edges.union(newEdges))

// Step 6: print graph to verify result
newGraph.triplets foreach println

该解决方案使用三个主要步骤来计算具有新边的图:1)一轮预凝胶。2) 一轮聚合消息。3)一轮映射节点到边。

于 2015-05-20T13:42:45.187 回答