4

我在 Zeppelin 笔记本上使用 Spark,而 groupByKey() 似乎不起作用。

这段代码:

df.groupByKey(row => row.getLong(0))
  .mapGroups((key, iterable) => println(key))

给了我这个错误(可能是编译错误,因为它在我正在处理的数据集非常大时立即显示):

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

我尝试添加一个案例类并将我的所有行映射到其中,但仍然出现相同的错误

import spark.implicits._

case class DFRow(profileId: Long, jobId: String, state: String)

def getDFRow(row: Row):DFRow = {
    return DFRow(row.getLong(row.fieldIndex("item0")),
                 row.getString(row.fieldIndex("item1")), 
                 row.getString(row.fieldIndex("item2")))
}

df.map(DFRow(_))
  .groupByKey(row => row.getLong(0))
  .mapGroups((key, iterable) => println(key))

我的数据框的架构是:

root
|-- item0: long (nullable = true)
|-- item1: string (nullable = true)
|-- item2: string (nullable = true)
4

1 回答 1

6

您正在尝试mapGroups使用一个函数(Long, Iterator[Row]) => Unit并且没有Encoderfor Unit(并不是说拥有一个函数是有意义的)。

通常Dataset,不关注 SQL DSL ( DataFrame => DataFrame, DataFrame => RelationalGroupedDataset, RelationalGroupedDataset => DataFrame, RelationalGroupedDataset => RelationalGroupedDataset) 的 API 部分需要对输出值进行隐式或显式编码器。

由于对象没有预定义的编码器,因此对静态类型数据Row使用Dataset[Row]with 方法设计没有多大意义。根据经验,您应该始终首先转换为静态类型的变体:

df.as[(Long, String, String)]

另请参阅尝试将数据帧行映射到更新行时出现编码器错误

于 2016-09-15T18:59:01.983 回答