39

当我试图在我的代码中做同样的事情时,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

我从这里获取了上述参考: Scala: How can I replace value in Dataframs using scala 但是我收到编码器错误

找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、S 字符串等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。

注意:我使用的是 spark 2.0!

4

4 回答 4

85

这里没有什么出乎意料的。您正在尝试使用使用 Spark 1.x 编写且 Spark 2.0 不再支持的代码:

  • 在 1.xDataFrame.map中是((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • 在 2.xDataset[Row].map中是((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

老实说,它在 1.x 中也没有多大意义。独立于版本,您可以简单地使用DataFrameAPI:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

如果你真的想使用map你应该使用静态类型Dataset

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

或至少返回一个具有隐式编码器的对象:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

最后,如果出于某种完全疯狂的原因你真的想映射Dataset[Row]你必须提供所需的编码器:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)
于 2016-09-11T06:48:51.773 回答
5

对于预先知道数据框架构的场景,@zero323 给出的答案是解决方案

但对于具有动态架构/或将多个数据帧传递给通用函数的场景:以下代码在从 1.6.1 从 2.2.0 迁移时对我们有用

import org.apache.spark.sql.Row

val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

此代码在两个版本的 spark 上执行。

缺点:spark 在数据帧/数据集 api 上提供的优化不会被应用。

于 2018-01-19T07:17:53.123 回答
1

只是为了更好地理解其他答案(尤其是@zero323关于over的答案的最后一点)添加一些其他重要的知识点:mapDataset[Row]

  • 首先,Dataframe.map给你一个Dataset(更具体地说Dataset[T],,而不是Dataset[Row])!
  • 并且Dataset[T]总是需要一个编码器,这就是这句话“Dataset[Row].map((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]”的意思。
  • Spark 已经预定义了很多编码器(可以通过 do 来import编辑import spark.implicits._),但是该列表仍然无法涵盖开发人员可能创建的许多特定于域的类型,在这种情况下,您需要自己创建编码器
  • 在此页面上的具体示例中,df.map返回一个Row类型 for Dataset,并稍等片刻,Row类型不在具有 Spark 预定义的编码器的类型列表中,因此您将自己创建一个。
  • 而且我承认为 type 创建编码器与上面链接Row中描述的方法有点不同,你必须使用which作为描述行类型的参数,就像上面 @zero323 提供的那样:RowEncoderStructType
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))

// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)
于 2020-10-03T10:36:01.187 回答
0

在我的 spark 2.4.4 版本中,我必须导入implicits。这是一个普遍的答案

val spark2 = spark
import spark2.implicits._

val data = df.rdd.map(row => my_func(row))

my_func 在哪里做了一些操作。

于 2019-11-12T23:34:53.210 回答