我在这个网站上找到了下面的代码: https ://spark.apache.org/docs/2.3.1/ml-tuning.html

// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
  .setEvaluator(new BinaryClassificationEvaluator)
  .setNumFolds(2)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel

正如他们所说,BinaryClassificationEvaluator 的默认指标是“AUC”。如何将此默认指标更改为 F1 分数?


val cv = new CrossValidator()
  .setEvaluator(new BinaryClassificationEvaluator.setMetricName("f1"))
  .setNumFolds(2)  // Use 3+ in practice
  .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel



import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{Dataset, functions => F}

class FScoreEvaluator(override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol{

  def this() = this(Identifiable.randomUID("FScoreEvaluator"))

  def evaluate(dataset: Dataset[_]): Double = {
    val truePositive = F.sum(((F.col(getLabelCol) === 1) && (F.col(getPredictionCol) === 1)).cast(IntegerType))
    val predictedPositive = F.sum((F.col(getPredictionCol) === 1).cast(IntegerType))
    val actualPositive = F.sum((F.col(getLabelCol) === 1).cast(IntegerType))

    val precision = truePositive / predictedPositive
    val recall = truePositive / actualPositive
    val fScore = F.lit(2) * (precision * recall) / (precision + recall)


  override def copy(extra: ParamMap): Evaluator = defaultCopy(extra)
基于@gmds 的回答。确保 Spark 版本 >=2.3。

您还可以按照Spark 中 RegressionEvaluator 的实现来实现其他自定义评估器。

我还添加了isLargerBetter以便实例化的评估器可以用于模型选择(例如 CV)

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol, HasWeightCol}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{Dataset, functions => F}

class WRmseEvaluator(override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol with HasWeightCol {

    def this() = this(Identifiable.randomUID("wrmseEval"))

    def setPredictionCol(value: String): this.type = set(predictionCol, value)
    def setLabelCol(value: String): this.type = set(labelCol, value)
    def setWeightCol(value: String): this.type = set(weightCol, value)
    def evaluate(dataset: Dataset[_]): Double = {
            .withColumn("residual", F.col(getLabelCol) - F.col(getPredictionCol))
                F.sqrt(F.sum(F.col(getWeightCol) * F.pow(F.col("residual"), 2)) / F.sum(getWeightCol))


    override def copy(extra: ParamMap): Evaluator = defaultCopy(extra)

    override def isLargerBetter: Boolean = false


val wrmseEvaluator = new WRmseEvaluator()

