Spark graphFrames 文档有一个很好的示例如何应用聚合消息功能。
对我来说,它似乎只计算单个顶点和第一个顶点的朋友/连接,而不是作为 graphXs pregel 运算符更深入地迭代到图中。
我如何在graphFrames中完成这样的迭代,以及使用类似于这里如何处理迭代的聚合消息https://github.com/sparkling-graph/sparkling-graph/blob/master/operators/src/main/scala/ml/ graphX 中的sparkling/graph/operators/measures/vertex/eigenvector/ EigenvectorCentrality.scala?
import org.graphframes.examples import org.graphframes.lib.AggregateMessages val g: GraphFrame = examples.Graphs.friends // get example graph // We will use AggregateMessages utilities later, so name it "AM" for short. val AM = AggregateMessages // For each user, sum the ages of the adjacent users. val msgToSrc = AM.dst("age") val msgToDst = AM.src("age") val agg = g.aggregateMessages .sendToSrc(msgToSrc) // send destination user's age to source .sendToDst(msgToDst) // send source user's age to destination .agg(sum(AM.msg).as("summedAges")) // sum up ages, stored in AM.msg column agg.show()
http://graphframes.github.io/user-guide.html#message-passing-via-aggregatemessages