1

我正在尝试使用 Scala 在 Spark 中创建一些简单的自定义聚合运算符。

我创建了一个简单的运算符层次结构,具有以下超类:

sealed abstract class Aggregator(val name: String) {
  type Key = Row  // org.apache.spark.sql.Row
  type Value

  ...
}

我还有一个伴生对象,它每次都构造适当的聚合器。请注意,每个运算符都可以指定它想要的值类型。

现在的问题是当我尝试调用时combineByKey

val agg = Aggregator("SUM")
val res = rdd
    .map(agg.mapper)
    .reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))

错误是:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(agg.Key, agg.Value)]

根据我的需要,Value可以是数字类型或元组,因此它没有边界定义。如果我将Value类型声明替换为:

type Value = Double

Aggregator课堂上,然后一切正常。因此,我认为该错误与不知道编译时reduceByKey的确切类型有关。Value

关于如何解决这个问题的任何想法?

4

1 回答 1

2

YourRDD不能被隐式转换为PairRDDFunctions,因为ClassTag键和值的所有隐式 s 都丢失了。

您可能希望将类标记作为隐式参数包含在您的Aggregator:

sealed abstract class Aggregator[K: ClassTag, V: ClassTag](name: String) {
  implicit val keyClassTag: ClassTag[K] = implicitly
  implicit val valueClassTag: ClassTag[V] = implicitly
}

或者可能:

sealed abstract class Aggregator[K, V](name: String)(implicit kt: ClassTag[K], vt: ClassTag[V]) {
  implicit val keyClassTag: ClassTag[K] = kt
  implicit val valueClassTag: ClassTag[V] = vt
}

甚至可能:

sealed abstract class Aggregator(name: String) {
  type K
  type V
  implicit def keyClassTag: ClassTag[K]
  implicit def valueClassTag: ClassTag[V]
}

最后一个变体将把提供ClassTags 的责任转移给抽象类的实现者。

现在,当在 aa中使用类型的聚合器时,您必须确保那些隐式提供的类标签在当前的隐式范围内:Aggregator[K, V]reduceByKey

val agg = Aggregator("SUM")
import agg._ // now the implicits should be visible
val res = rdd
.map(agg.mapper)
.reduceByKey(agg.reducer(_: agg.Value, _: agg.Value))
于 2018-05-13T22:01:02.733 回答