我只是从 GraphFrames 开始,虽然我正在关注文档,但我无法从 aggregateMessages 函数获得任何结果(它返回一个空数据帧)。这是我的问题的一个简化示例:我调用了 GraphFrames 对象testGraph
,这样我的 vertexRDD 仅包含一个Y
没有顶点属性的顶点,而我的 edgeRDD 包含如下两条记录:
| src | dst | min_ts1 | min_ts2 |
| X | Y | 20 | null |
| Y | X | null | -10 |
现在,我想实现一个简单的算法,将值发送min_ts1
到dst
,然后发送min_ts2
到src
。我用来实现这个算法的代码是:
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages
val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")
val delay = testGraph
.aggregateMessages
.sendToSrc(msgToSrc)
.sendToDst(msgToDst)
.agg(sum(AM.msg).as("avg_time_delay"))
我意识到这里有一些空值,但无论如何我都希望消息传递算法执行以下操作:查看第一条记录,并发送to的消息和20
toY
的消息。然后查看第二条记录,发送一条to X 的消息和一条to的消息。最后,我希望结果显示消息的总和是,并且结果中没有记录,因为它不包含在 vertexRDD 中。如果包含在 vertexRDD 中,我希望结果是简单的,因为两条消息都是.null
X
null
-10
Y
Y
10
X
X
null
null
但是,我得到的是一个空的 RDD。有人可以帮我理解为什么我得到一个空的结果吗?