我在使用 graphx 将 mapReduceTriplets 应用到我的图形网络时遇到了一些问题。
我一直在关注教程并读入我自己的数据,这些数据以 [Array[String],Int] 的形式组合在一起,例如我的顶点是:
org.apache.spark.graphx.VertexRDD[Array[String]]
例如 (3999,Array(17, Low, 9))
我的优势是:
org.apache.spark.graphx.EdgeRDD[Int]
例如边缘(3999,4500,1)
我正在尝试使用 mapReduceTriplets 应用聚合类型函数,该函数计算顶点数组中的最后一个整数(在上面的示例 9 中)与第一个整数(在上面的示例 17 中)相同或不同的数量所有连接的顶点。
因此,您最终会得到一个匹配或不匹配数量的计数列表。
我遇到的问题是使用 mapReduceTriplets 应用任何函数,我对 scala 很陌生,所以这可能真的很明显,但是在 graphx 教程中,它有一个使用 Graph[Double, Int] 格式的图形的示例,但是我的图表采用 Graph[Array[String],Int] 的格式,所以我只是尝试作为第一步来弄清楚如何在示例中使用我的图表,然后从那里开始工作。
graphx网站上的例子如下:
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
Iterator((triplet.dstId, (1, triplet.srcAttr)))
} else {
// Don't send a message for this triplet
Iterator.empty
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
任何建议将不胜感激,或者如果您认为有比使用 mapreducetriplets 更好的方法,我会很高兴听到它。
修改了新代码
val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")
val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}
val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
case (id, t) => {
val neighbors: Array[(VertexId, Array[String])] = t._1
val nodeAttr = (t._2)
neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))
}
}