正如以利亚萨指出的那样,foreach
(and foreachRDD
) 不返回值;它们仅用于副作用。如果你想退货,你需要map
. 根据您的第二个解决方案:
val rmse = output.map(rdd => new RegressionMetrics(rdd).rootMeanSquaredError)
如果你为它做一个小功能,它看起来会更好:
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
val rmse = output.map(getRmse)
忽略空 RDD,
val rmse = output.filter(_.nonEmpty).map(getRmse)
这是与 for-comprehension 完全相同的序列。它只是 map、flatMap 和 filter 的语法糖,但我认为当我第一次学习 Scala 时它更容易理解:
val rmse = for {
rdd <- output
if (rdd.nonEmpty)
} yield new RegressionMetrics(rdd).rootMeanSquaredError
这是一个汇总错误的函数,就像您的第一次尝试一样:
def calculateRmse(output: DStream[(Double, Double)]): Double = {
val getRmse = (rdd: RDD) => new RegressionMetrics(rdd).rootMeanSquaredError
output.filter(_.nonEmpty).map(getRmse).reduce(_+_)
}
编译器的抱怨nonEmpty
其实是DStream的filter
方法有问题。不是对 DStream 中的 RDD 进行操作,而是对DStream 的类型参数给出filter
的双精度对进行操作。(Double, Double)
我对 Spark 的了解不足以说这是一个缺陷,但它很奇怪。Filter
并且大多数其他对集合的操作通常是根据 foreach 定义的,但是 DStream 实现了这些功能而不遵循相同的约定;它不推荐使用的方法foreach
和当前方法foreachRDD
都在流的 RDD 上运行,但它的其他高阶方法没有。
所以我的方法行不通。DStream 可能有一个奇怪的理由(与性能有关?)这可能是一种不好的方法foreach
:
def calculateRmse(ds: DStream[(Double, Double)]): Double = {
var totalError: Double = 0
def getRmse(rdd:RDD[(Double, Double)]): Double = new RegressionMetrics(rdd).rootMeanSquaredError
ds.foreachRDD((rdd:RDD[(Double, Double)]) => if (!rdd.isEmpty) totalError += getRmse(rdd))
totalError
}
但它有效!