1

这是我计算均方根误差的函数。但是由于错误,最后一行无法编译Type mismatch issue (expected: Double, actual: Unit)。我尝试了许多不同的方法来解决这个问题,但仍然没有成功。有任何想法吗?

  def calculateRMSE(output: DStream[(Double, Double)]): Double = {
        val summse = output.foreachRDD { rdd =>
          rdd.map {
              case pair: (Double, Double) =>
                val err = math.abs(pair._1 - pair._2);
                err*err
          }.reduce(_ + _)
        }
        // math.sqrt(summse)  HOW TO APPLY SQRT HERE?
  }
4

2 回答 2

2

正如以利亚萨指出的那样,foreach(and foreachRDD) 不返回值;它们仅用于副作用。如果你想退货,你需要map. 根据您的第二个解决方案:

val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)

如果你为它做一个小功能,它看起来会更好:

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

val rmse = output.map(getRmse)

忽略空 RDD,

val rmse = output.filter(_.nonEmpty).map(getRmse)

这是与 for-comprehension 完全相同的序列。它只是 map、flatMap 和 filter 的语法糖,但我认为当我第一次学习 Scala 时它更容易理解:

val rmse = for {
  rdd <- output
  if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError

这是一个汇总错误的函数,就像您的第一次尝试一样:

def calculateRmse(output: DStream[(Double, Double)]): Double = {

val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError

output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}

编译器的抱怨nonEmpty其实是DStream的filter方法有问题。不是对 DStream 中的 RDD 进行操作,而是对DStream 的类型参数给出filter的双精度对进行操作。(Double, Double)

我对 Spark 的了解不足以说这是一个缺陷,但它很奇怪。Filter并且大多数其他对集合的操作通常是根据 foreach 定义的,但是 DStream 实现了这些功能而不遵循相同的约定;它不推荐使用的方法foreach和当前方法foreachRDD都在流的 RDD 上运行,但它的其他高阶方法没有

所以我的方法行不通。DStream 可能有一个奇怪的理由(与性能有关?)这可能是一种不好的方法foreach

def calculateRmse(ds: DStream[(Double, Double)]): Double = {

  var totalError: Double = 0

  def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError

  ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))

  totalError
}

但它有效!

于 2016-05-02T22:18:15.003 回答
0

我设法按如下方式完成了这项任务:

import org.apache.spark.mllib.evaluation.RegressionMetrics

output.foreachRDD { rdd =>
  if (!rdd.isEmpty)
    {
      val metrics = new RegressionMetrics(rdd)
      val rmse = metrics.rootMeanSquaredError
      println("RMSE: " + rmse)
    }
}
于 2016-05-02T15:08:09.187 回答