2

我一直在使用图框,现在我正在使用聚合消息。顶点模式是:

 |-- id: long (nullable = false)
 |-- company: string (nullable = true)
 |-- money: integer (nullable = false)
 |-- memoryLearned: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)

如果我尝试一下:

  ...
 def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
    memory + 10
  }

...

val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("id"))

val aggregates = gx
        .aggregateMessages
        .sendToSrc(msgToSrc)
        .agg(sum(AM.msg).as("aggMess"))
aggregates.show()

它有效!但我需要从 memoryLearned 中获取键和值,所以我认为它有效:

...
     def createMessage(memory: org.apache.spark.sql.Column): org.apache.spark.sql.Column = {
        for((k,v) <- memory)
           ...
      }


...

val msgToSrc: org.apache.spark.sql.Column = this.createMessage(AM.dst("memoryLearned"))

val aggregates = gx
        .aggregateMessages
        .sendToSrc(msgToSrc)
        .agg(myUDFA(AM.msg).as("aggMess"))
aggregates.show()

我收到了这个错误:"value filter is not a member of org.apache.spark.sql.Column"

我试图搜索如何投射或获取 MapType,但我只找到使用数据框爆炸之类的功能,但我没有 df,我只有一列......

如果我把这个:memory.getItem("aKeyFromMap")而不是for(...,我从 Map 得到正确的值......

createMessage我还尝试在(一行和一列)中创建“aux”数据框以使用 df 函数,但是当我使用时.withColumn("newColumn",memory),它失败了..

我被阻止了..有什么想法吗?

非常感谢!!问候

4

1 回答 1

1

如果你想迭代MapType Column,并且你不知道前面的键,你必须使用UDF或对外部类型进行其他操作(如map):

import org.apache.spark.sql.functions.udf

def createMessage = udf( (memory: Map[String, Integer]) => {
  for( (k,v) <- memory )
  ...
} )

你得到:

我收到此错误:“值过滤器不是 org.apache.spark.sql.Column 的成员”

因为 for 理解是map/ flatMap/的语法糖filter

于 2018-01-31T12:03:35.040 回答