4

Spark 2.3.0 与 Scala 2.11。我正在Aggregator根据此处的文档实施自定义。聚合器需要输入、缓冲区和输出 3 种类型。

我的聚合器必须对窗口中所有先前的行采取行动,所以我这样声明:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

其中一个覆盖方法应该返回缓冲区类型的编码器,在这种情况下是ListBuffer. 我找不到任何合适的编码器,org.apache.spark.sql.Encoders也找不到任何其他编码方式,所以我不知道在这里返回什么。

我想创建一个具有单一类型属性的新案例类,ListBuffer[Foo]并将其用作我的缓冲区类,然后Encoders.product在其上使用,但我不确定这是否有必要,或者是否还有其他我遗漏的东西。感谢您的任何提示。

4

2 回答 2

6

您应该让 Spark SQL 完成它的工作并使用以下方法找到合适的编码器ExpressionEncoder

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
于 2018-05-04T21:22:57.367 回答
1

我在 org.apache.spark.sql.Encoders 中看不到任何可用于直接编码 ListBuffer 的内容,或者就此而言甚至是 List

正如您所建议的,一种选择似乎是将其放入案例类中:

import org.apache.spark.sql.Encoders

case class Foo(field: String)
case class Wrapper(lb: scala.collection.mutable.ListBuffer[Foo])
Encoders.product[Wrapper]

另一种选择可能是使用 kryo:

Encoders.kryo[scala.collection.mutable.ListBuffer[Foo]]

或者最后你可以看看 ExpressionEncoders,它扩展了 Encoder:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]]

这是最好的解决方案,因为它使一切对催化剂透明,因此允许它进行所有精彩的优化。

我在玩游戏时注意到的一件事:

ExpressionEncoder[scala.collection.mutable.ListBuffer[Foo]].schema == ExpressionEncoder[List[Foo]].schema

在执行聚合时,我没有测试上述任何内容,因此可能存在运行时问题。希望这会有所帮助。

于 2018-04-08T18:09:26.003 回答