我正在尝试在其元素是简单案例类的实例的 RDD 上执行 groupBy,但我遇到了一个奇怪的错误,我不知道如何解决。以下代码在 Spark-shell(Spark 0.9.0、Scala 2.10.3、Java 1.7.0)中重现了该问题:
case class EmployeeRec( name : String, position : String, salary : Double ) extends Serializable;
// I suspect extends Serializable is not needed for case classes, but just in case...
val data = sc.parallelize( Vector( EmployeeRec("Ana", "Analist", 200 ),
EmployeeRec("Maria", "Manager", 250.0 ),
EmployeeRec("Paul", "Director", 300.0 ) ) )
val groupFun = ( emp : EmployeeRec ) => emp.position
val dataByPos = data.groupBy( groupFun )
最后一条语句产生的错误是:
val dataByPos = data.groupBy( groupFun )
<console>:21: error: type mismatch;
found : EmployeeRec => String
required: EmployeeRec => ?
val dataByPos = data.groupBy( groupFun )
所以我尝试了:
val dataByPos = data.groupBy[String]( groupFun )
这个错误现在有点可怕:
val dataByPos = data.groupBy[String]( groupFun )
<console>:18: error: overloaded method value groupBy with alternatives:
(f: EmployeeRec => String,p: org.apache.spark.Partitioner)(implicit evidence$8: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])] <and>
(f: EmployeeRec => String,numPartitions: Int)(implicit evidence$7: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])] <and>
(f: EmployeeRec => String)(implicit evidence$6: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[(String, Seq[EmployeeRec])]
cannot be applied to (EmployeeRec => String)
val dataByPos = data.groupBy[String]( groupFun )
我试图通过添加额外的参数 numPartions = 10 来更具体地了解我想要应用的重载方法 groupBy 的版本(当然,我的真实数据集远大于 3 条记录)
val dataByPos = data.groupBy[String]( groupFun, 10 )
我得到与以前完全相同的错误。
有任何想法吗?我怀疑这个问题可能与隐含的证据论点有关……不幸的是,这是我不太了解的 scala 领域之一。
注 1:此代码的模拟使用元组而不是案例类 EmployeeRec,没有任何问题。但是,我希望能够使用案例类而不是元组来获得更好、更易于维护的代码,这些代码不需要我按位置而不是按名称来记住或处理字段(实际上我每个员工有超过 3 个字段.)
注意 2:似乎观察到的这个问题(使用案例类 EmployeeRec 时)可能在 Spark 1.+ 中得到修复,因为在使用 spark-core_2.10 时,上述代码的任何版本都由 eclipse scala 插件正确编译-1.0.0-cdh5.1.0.jar。但是,我不确定如何或是否能够在我有权访问的集群中运行该版本的 Spark,我希望能更好地理解问题,以便为 Spark 0.9 提供解决方法。 0