我正在将一些Graph.pregel
算法移植到GraphFrame.aggregateMessages
. 我发现GraphFrame
API 有点麻烦。
在Graph
API 中,我可以发送一个case class
作为我的消息类型。但在GraphFrame
API 中,aggregateMessages.sendToSrc
可以.sendToDst
使用 SQL 表达式String
或Column
. 我发现这很强大,因为它是一个痛苦的屁股。
假设你有:
case class Vote(yay: Boolean, voters: Long = 1L)
case class Send(vote: Vote, from: Long)
使用GraphX
andpregel
函数,我可以构建一个sendMsg
返回的函数,Iterator[(VertexId,Send)]
它可能类似于:Iterator((1L, Send(Vote(yay = true), from = 2L) ))
GraphFrames
我必须构建一个与Column
具有相同目的的Iterator[(VertexId,Send)]
,理想情况下不完全放弃我已经定义的case classes
(比上面的示例更复杂)。
有什么捷径可以做到这一点?
到目前为止我得到了什么:
case class
将 a 的实例转换为相应的结构非常容易。这主要让我到达那里:
def ccToStruct(cc: Product) : Column = {
val values = cc.productIterator
var seq = Seq[Column]()
while (values.hasNext) {
val field = values.next() match {
case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get)
case p: Product if (p.productArity > 0) => ccToStruct(p)
case x => lit(x)
}
seq = seq :+ field
}
struct(seq:_*)
}
这让我可以:
ccToStruct(Send(Vote(true, 1L), 123L))
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123)
我必须稍微修补一下架构以使其正常工作,但在我开始这样做之前,我意识到这是一种完全没用的方法。您永远不会真正想将case class
值转换为struct
--ccToStruct(Send(Vote(true, 1L), 123L))
创建一个非常无用的消息。它相当于发送一个lit(Send(..))
值——除了lit()
不支持案例类。
您想要做的是将lit
值与AM.dst("*")
和AM.src("*")
列混合和匹配,但这样做与case class
. (我曾想过完全放弃案例类,但我有一个消息,只要我继续使用案例类,这种逻辑就很容易移植。UDAF
)sum
我相信答案是能够创建这样的结构:
import org.graphframes.lib.AggregateMessages
val AM = AggregateMessages
val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id"))
然后将其转换为我的案例类的Column
使用struct()
和模式。
如果没有人有更好的方法来做到这一点(甚至可能有人这样做),我稍后会用解决方案回答我自己的问题。