0

所以我是 Scala 的新手,刚开始使用 RDD 和函数式 Scala 操作。

我试图迭代我的​​ Pair RDD的值,并通过应用定义的函数返回Var1存储的值的平均值,以便最终返回是 Var1 的唯一列表,每个列表都有一个关联。我在弄清楚如何迭代这些值时遇到了很多麻烦。 Var2averageAvgVar2

*编辑:我有以下类型声明:

case class ID: Int,  Var1: Int, Var2: Int extends Serializable

我有以下功能:

  def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {

    def average(as: Array[Var2]): AvgVar2 = {
       var sum = 0.0
       var i = 0.0
       while (i < as.length) {
           sum += Var2.val
           i += 1
      }
      sum/i
    }

    //My attempt at Scala
    rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}

我在 Scala 的尝试是尝试执行以下操作:

  1. 将 RDD 对 Iterable 拆分为Var1-Var2.
  2. 按 的键分组Var1并创建关联的数组Var2
  3. 将我的average函数应用于每个数组Var2
  4. AvgVar2将关联Var1的作为 RDD 的集合返回

*编辑:

一些示例输入数据rdds

//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]

一些示例输出数据:

//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]

*编辑:工作scala代码行:

rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))
4

1 回答 1

1

考虑ID= Var1,一个简单的.map()将解决它:

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Iterable[(Int, Int)]): Double = {
    as.map(_._2).reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2)))
}

输出:

val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))

scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))

编辑:(average()具有相同的签名):

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Array[Int]): Double = {
    as.reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}
于 2019-02-01T18:58:39.540 回答