更新
Set
这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况有所好转,它增加了对、Seq
、Map
、Date
、Timestamp
和的内置编码器支持BigDecimal
。如果你坚持只使用 case 类和通常的 Scala 类型来制作类型,那么你应该只使用隐式 in 就可以了SQLImplicits
。
不幸的是,几乎没有添加任何东西来帮助解决这个问题。搜索或查找主要与原始类型有关的内容(以及对案例类的一些调整)@since 2.0.0
。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。有了这些,接下来是一些技巧,考虑到我们目前可以使用的东西,这些技巧可以做得尽可能好。作为一个预先的免责声明:这不会完美地工作,我会尽我最大的努力使所有限制都清楚和预先。Encoders.scala
SQLImplicits.scala
究竟是什么问题
当你想创建一个数据集时,Spark“需要一个编码器(将类型 T 的 JVM 对象转换为内部 Spark SQL 表示),该编码器通常通过 a 的隐式自动创建SparkSession
,或者可以通过调用静态方法显式创建on Encoders
"(取自关于 的文档createDataset
)。编码器将采用您正在编码的类型的Encoder[T]
形式T
。第一个建议是添加import spark.implicits._
(它为您提供这些隐式编码器),第二个建议是使用这组编码器相关函数显式传入隐式编码器。
没有可用于常规课程的编码器,因此
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
会给你以下隐式相关的编译时错误:
找不到存储在数据集中的类型的编码器。通过导入 sqlContext.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持
但是,如果您将刚刚用于获取上述错误的任何类型包装在扩展的某个类中Product
,则错误会令人困惑地延迟到运行时,因此
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
编译得很好,但在运行时失败
java.lang.UnsupportedOperationException:未找到 MyObj 的编码器
原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,Spark 在编译时的所有检查是最外层的类扩展Product
(所有案例类都这样做),并且只在运行时意识到它仍然不知道该怎么做MyObj
(如果我试图做同样的问题会发生a Dataset[(Int,MyObj)]
- Spark 等到运行时才开始MyObj
)。这些是急需解决的核心问题:
- 一些扩展
Product
编译的类,尽管总是在运行时崩溃,并且
- 没有办法为嵌套类型传递自定义编码器(我无法为 Spark 提供一个编码器,
MyObj
以便它知道如何编码Wrap[MyObj]
or (Int,MyObj)
)。
只需使用kryo
每个人都建议的解决方案是使用kryo
编码器。
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
不过,这很快就会变得非常乏味。特别是如果您的代码正在操作各种数据集、连接、分组等。您最终会积累一堆额外的隐式。那么,为什么不直接隐式地自动完成这一切呢?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
现在,似乎我几乎可以做任何我想做的事情(下面的例子在自动导入的spark-shell
地方不起作用)spark.implicits._
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
或者差不多。问题是使用kryo
导致 Spark 只是将数据集中的每一行存储为一个平面二进制对象。对于map
, filter
,foreach
这就足够了,但是对于像join
,Spark 之类的操作确实需要将它们分成列。检查d2
or的架构d3
,您会看到只有一个二进制列:
d2.printSchema
// root
// |-- value: binary (nullable = true)
元组的部分解决方案
因此,使用 Scala 中隐含的魔力(更多内容在6.26.3 重载解决方案中),我可以为自己制作一系列隐含,这些隐含将尽可能好地完成工作,至少对于元组而言,并且可以很好地与现有的隐含一起工作:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
然后,有了这些隐式,我可以让我上面的例子工作,尽管有一些列重命名
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
我还没有弄清楚如何在不重命名的情况下默认获取预期的元组名称 ( _1
, _2
, ...) - 如果其他人想玩这个,这是引入名称"value"
的地方,也是元组的地方通常会添加名称。然而,关键是我现在有一个很好的结构化模式:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
因此,总而言之,此解决方法:
- 允许我们为元组获取单独的列(所以我们可以再次加入元组,耶!)
- 我们可以再次依赖隐式(所以不需要到处传递
kryo
)
- 几乎完全向后兼容
import spark.implicits._
(涉及一些重命名)
- 不允许我们加入序列化的二进制列,更不用说
kyro
那些可能有的字段了
- 具有将一些元组列重命名为“值”的令人不快的副作用(如有必要,可以通过转换
.toDF
、指定新列名和转换回数据集来撤消此操作 - 并且模式名称似乎通过连接保留,最需要它们的地方)。
一般类的部分解决方案
这个不太愉快,也没有好的解决方案。但是,既然我们有了上面的元组解决方案,我预感来自另一个答案的隐式转换解决方案也不会那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集之后,您可能会使用数据框方法重命名列。如果一切顺利,这确实是一个进步,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制kryo
序列化器,那是不可能的。
这是一个做所有事情的例子:我有一个类MyObj
,它的字段类型Int
为 、java.util.UUID
和Set[String]
。第一个照顾自己。第二个,虽然我可以序列化 usingkryo
如果存储为 a 会更有用String
(因为UUID
s 通常是我想要加入的东西)。第三个真的只是属于二进制列。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
现在,我可以使用这种机器创建一个具有良好模式的数据集:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
架构向我展示了具有正确名称的列和前两个我可以加入的东西。
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)