-1

我只是从 GraphFrames 开始,虽然我正在关注文档,但我无法从 aggregateMessages 函数获得任何结果(它返回一个空数据帧)。这是我的问题的一个简化示例:我调用了 GraphFrames 对象testGraph,这样我的 vertexRDD 仅包含一个Y没有顶点属性的顶点,而我的 edgeRDD 包含如下两条记录:

| src | dst | min_ts1 | min_ts2 |
|  X  |  Y  |    20   |   null  |
|  Y  |  X  |   null  |   -10   |

现在,我想实现一个简单的算法,将值发送min_ts1dst,然后发送min_ts2src。我用来实现这个算法的代码是:

import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions._
val AM = AggregateMessages

val msgToSrc = AM.edge("min_ts2)
val msgToDst = AM.edge("min_ts1")

val delay = testGraph
.aggregateMessages
  .sendToSrc(msgToSrc)
  .sendToDst(msgToDst)  
  .agg(sum(AM.msg).as("avg_time_delay")) 

我意识到这里有一些空值,但无论如何我都希望消息传递算法执行以下操作:查看第一条记录,并发送to的消息和20toY的消息。然后查看第二条记录,发送一条to X 的消息和一条to的消息。最后,我希望结果显示消息的总和是,并且结果中没有记录,因为它不包含在 vertexRDD 中。如果包含在 vertexRDD 中,我希望结果是简单的,因为两条消息都是.nullXnull-10YY10XXnullnull

但是,我得到的是一个空的 RDD。有人可以帮我理解为什么我得到一个空的结果吗?

4

1 回答 1

0

好的,看来这个问题的原因确实是X我的 VertexRDD 中没有。我猜即使在我的 edgeRDD 中有进出该顶点的边并且我的聚合消息仅取决于边属性,该算法也无法发送这些消息。

于 2017-11-28T02:45:44.920 回答