52

一旦我在 Spark 中获得了一些 Row 类,无论是 Dataframe 还是 Catalyst,我想在我的代码中将它转换为 case 类。这可以通过匹配来完成

someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}

但是当行有大量列时,它会变得很难看,比如十几个双精度数、一些布尔值,甚至是偶尔的空值。

我希望能够 - 抱歉 - 将 Row 转换为 myCaseClass。是否有可能,或者我已经获得了最经济的语法?

4

4 回答 4

40

DataFrame 只是 Dataset[Row] 的类型别名。这些操作也称为“非类型化转换”,与强类型化 Scala/Java 数据集附带的“类型化转换”相反。

在 spark 中从 Dataset[Row] 到 Dataset[Person] 的转换非常简单

val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")

此时,Spark 会将您的数据转换为 DataFrame = Dataset[Row],这是一个通用 Row 对象的集合,因为它不知道确切的类型。

// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class) 

val DStoProcess = DFtoProcess.as[Person](personEncoder)

现在,SparkDataset[Row] -> Dataset[Person]按照 Person 类的要求转换特定类型的 Scala / Java JVM 对象。

有关详细信息,请参阅 databricks 提供的以下链接

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

于 2016-08-20T12:24:51.280 回答
26

据我所知,您不能将 Row 强制转换为案例类,但我有时会选择直接访问行字段,例如

map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))

我发现这更容易,特别是如果案例类构造函数只需要行中的一些字段。

于 2015-09-02T08:02:27.210 回答
21
scala> import spark.implicits._    
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> case class Student(id: Int, name: String)
defined class Student

scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]

这里sparkspark.implicits._你的SparkSession。如果您在 REPL 中,则会话已经定义,spark否则您需要相应地调整名称以对应于您的SparkSession.

于 2017-12-25T00:57:22.117 回答
7

当然,您可以将 Row 对象匹配到案例类中。假设您的 SchemaType 有许多字段,并且您希望将其中一些字段匹配到您的案例类中。如果您没有空字段,您可以简单地执行以下操作:

case class MyClass(a: Long, b: String, c: Int, d: String, e: String)

dataframe.map {
  case Row(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}

这种方法在空值的情况下会失败,并且还需要您明确定义每个字段的类型。如果您必须处理空值,则需要通过执行丢弃所有包含空值的行

dataframe.na.drop()

即使空字段不是在您的案例类的模式匹配中使用的字段,这也会删除记录。或者,如果您想处理它,您可以将 Row 对象转换为 List ,然后使用选项模式:

case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)

dataframe.map(_.toSeq.toList match {
  case List(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(
      a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}

查看这个 github 项目 Sparkz (),它很快就会引入许多库来简化 Spark 和 DataFrame API,并使它们更加面向函数式编程。

于 2015-11-02T17:24:10.883 回答