我正在尝试使用 Scala 在 Spark 中创建一些简单的自定义聚合运算符。
我创建了一个简单的运算符层次结构,具有以下超类:
sealed abstract class Aggregator(val name: String) {
type Key = Row // org.apache.spark.sql.Row
type Value
...
}
我还有一个伴生对象,它每次都构造适当的聚合器。请注意,每个运算符都可以指定它想要的值类型。
现在的问题是当我尝试调用时combineByKey
:
val agg = Aggregator("SUM")
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
错误是:
value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]
根据我的需要,Value
可以是数字类型或元组,因此它没有边界定义。如果我将Value
类型声明替换为:
type Value = Double
在Aggregator
课堂上,然后一切正常。因此,我认为该错误与不知道编译时reduceByKey
的确切类型有关。Value
关于如何解决这个问题的任何想法?