Spark 2.3.0 与 Scala 2.11。我正在Aggregator
根据此处的文档实施自定义。聚合器需要输入、缓冲区和输出 3 种类型。
我的聚合器必须对窗口中所有先前的行采取行动,所以我这样声明:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
其中一个覆盖方法应该返回缓冲区类型的编码器,在这种情况下是ListBuffer
. 我找不到任何合适的编码器,org.apache.spark.sql.Encoders
也找不到任何其他编码方式,所以我不知道在这里返回什么。
我想创建一个具有单一类型属性的新案例类,ListBuffer[Foo]
并将其用作我的缓冲区类,然后Encoders.product
在其上使用,但我不确定这是否有必要,或者是否还有其他我遗漏的东西。感谢您的任何提示。