2

我正在将一些Graph.pregel算法移植到GraphFrame.aggregateMessages. 我发现GraphFrameAPI 有点麻烦。

GraphAPI 中,我可以发送一个case class作为我的消息类型。但在GraphFrameAPI 中,aggregateMessages.sendToSrc可以.sendToDst使用 SQL 表达式StringColumn. 我发现这很强大,因为它是一个痛苦的屁股。

假设你有:

case class Vote(yay: Boolean, voters: Long = 1L)
case class Send(vote: Vote, from: Long)

使用GraphXandpregel函数,我可以构建一个sendMsg返回的函数,Iterator[(VertexId,Send)]它可能类似于:Iterator((1L, Send(Vote(yay = true), from = 2L) ))

GraphFrames我必须构建一个与Column具有相同目的的Iterator[(VertexId,Send)],理想情况下不完全放弃我已经定义的case classes(比上面的示例更复杂)。

有什么捷径可以做到这一点?

到目前为止我得到了什么:

case class将 a 的实例转换为相应的结构非常容易。这主要让我到达那里:

def ccToStruct(cc: Product) : Column = {
  val values = cc.productIterator
  var seq = Seq[Column]()
  while (values.hasNext) {
    val field = values.next() match {
      case p: Some[Product @unchecked] if (p.get.productArity > 0) => ccToStruct(p.get)
      case p: Product if (p.productArity > 0) => ccToStruct(p)
      case x => lit(x)
    }
    seq = seq :+ field
  }
  struct(seq:_*)
}

这让我可以:

ccToStruct(Send(Vote(true, 1L), 123L))
// res4: org.apache.spark.sql.Column = struct(struct(true,1),123)

我必须稍微修补一下架构以使其正常工作,但在我开始这样做之前,我意识到这是一种完全没用的方法。您永远不会真正想将case class值转换为struct--ccToStruct(Send(Vote(true, 1L), 123L))创建一个非常无用的消息。它相当于发送一个lit(Send(..))值——除了lit()不支持案例类。

您想要做的是将lit值与AM.dst("*")AM.src("*")列混合和匹配,但这样做与case class. (我曾想过完全放弃案例类,但我有一个消息,只要我继续使用案例类,这种逻辑就很容易移植。UDAFsum

我相信答案是能够创建这样的结构:

import org.graphframes.lib.AggregateMessages
val AM = AggregateMessages

val msg = Seq[Any](Seq[Any](true, 1L), AM.src("id"))

然后将其转换为我的案例类的Column使用struct()和模式。

如果没有人有更好的方法来做到这一点(甚至可能有人这样做),我稍后会用解决方案回答我自己的问题。

4

1 回答 1

0

这就是我想出的。

对于我想做的事情,即创建Column具有案例类结构但具有绑定能力的对象DataFrame.columns,我决定我的主要数据结构应该是Seq[Any]. Seq应该与我的案例类的结构相匹配——基本上Seq是案例类的构造函数参数。如果我的案例类是:

case class Vote(yay: Boolean, voters: Long)

然后我可以创建以下 Vote-like Seq

val voteSeq = Seq[Any](true, 1L)

但我必须使用 a 的原因Seq[Any]是因为更有趣的是,我可以创建:

val boundVote = Seq[Any](true, AM.edge("voters"))

我想出了几个可用于SeqColumn. 我Column用 SQL 函数创建struct()。您也可以使用 SQL 字符串表达式来完成这一切。但我决定Columns改用。我想让它尽可能干净,而 String SQL 表达式会变得混乱。

如果你没有在你的结构中正确命名你的列,你会得到看起来像这样的结构:

  vote: struct (nullable = false)
   |-- col1: boolean (nullable = false)
   |-- col2: long (nullable = false)

当您尝试将其转换为案例类时,这会很糟糕。相反,您必须使用as所有列,因此您得到:

  vote: struct (nullable = false)
   |-- yay: boolean (nullable = false)
   |-- voters: long (nullable = false)

解决方案是采用StructType并使用它来创建字段名称。事实证明,我已经涵盖了StructType从案例类中自动派生 a ——所以这是简单的部分。第一个函数完成了最困难的部分——它递归地遍历Seq模式和模式,并生成Columns最终被包裹在一个 final 中:struct(colSeq:_*)

def seqToColumnSchema(anySeq: Seq[Any], schema: StructType) : Column = {
  var colSeq = Seq[Column]()
  anySeq.zip(schema.fields).foreach{ case (value, field) => {
    colSeq = colSeq :+ (value match {
      case c: Column => c as field.name
      case p: Seq[Any] if (p.length > 0) => {
        field.dataType match {
          case s: StructType => seqToColumnSchema(p, s) as field.name
          case a: ArrayType => array(p.map(v => lit(v)):_*) as field.name
          case x => lit(x) as field.name
        }
      }
      case x => lit(x) as field.name
    })
  }}
  struct(colSeq:_*)
}

第二个函数只是第一个函数的包装器,但它可以让您执行以下操作:

seqToColumn[Vote](Seq(true, AM.edge("voters")))

不必提供StructType,您只需在[...]

import org.apache.spark.sql.catalyst.ScalaReflection    

def seqToColumn[T: TypeTag](anySeq: Seq[Any]) : Column = {
  val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
  seqToColumnSchema(anySeq, schema)  
}

所有这一切,只是为了让我可以做到这一点:

import org.graphframes.lib.AggregateMessages
val AM = AggregateMessages

case class Vote(yay: Boolean, voters: Long)

val voteSeq = Seq[Any](true, AM.edge("voters"))
val voteMsg = seqToColumn[Vote](voteSeq)
// voteMsg: org.apache.spark.sql.Column = struct(true AS yay#18,edge[voters] AS voters#19)

graphFrame.aggregateMessages.sendToDst(voteMsg).agg(voteSum(AM.msg) as "out").printSchema
root
 |-- id: long (nullable = false)
 |-- out: struct (nullable = true)
 |    |-- vote: struct (nullable = false)
 |    |    |-- yay: boolean (nullable = false)
 |    |    |-- voters: long (nullable = false)
于 2016-04-23T06:15:53.280 回答