10

你们知道在哪里可以找到 Spark 中多类分类的示例吗?我花了很多时间在书籍和网络上搜索,到目前为止我只知道根据文档的最新版本是可能的。

4

2 回答 2

28

机器学习

建议在 Spark 2.0+ 中使用

我们将使用与下面 MLlib 中相同的数据。有两个基本选项。如果Estimator支持开箱即用的多类分类(例如随机森林),您可以直接使用它:

val trainRawDf = trainRaw.toDF

import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer}
import org.apache.spark.ml.Pipeline

import org.apache.spark.ml.classification.RandomForestClassifier

val transformers = Array(
  new StringIndexer().setInputCol("group").setOutputCol("label"),
  new Tokenizer().setInputCol("text").setOutputCol("tokens"),
  new CountVectorizer().setInputCol("tokens").setOutputCol("features")
)


val rf = new RandomForestClassifier() 
  .setLabelCol("label")
  .setFeaturesCol("features")

val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)

model.transform(trainRawDf)

如果模型仅支持二元分类(逻辑回归)并扩展o.a.s.ml.classification.Classifier,您可以使用 one-vs-rest 策略:

import org.apache.spark.ml.classification.OneVsRest
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression() 
  .setLabelCol("label")
  .setFeaturesCol("features")

val ovr = new OneVsRest().setClassifier(lr)

val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)

MLLib

根据目前的官方文档(MLlib 1.6.0),以下方法支持多类分类:

  • 逻辑回归,
  • 决策树,
  • 随机森林,
  • 朴素贝叶斯

至少有一些例子使用了多类分类:

忽略方法特定参数的通用框架与 MLlib 中的所有其他方法几乎相同。您必须预处理您的输入以创建包含表示label和的列的任一数据框features

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

RDD[LabeledPoint]

Spark 提供了广泛的有用工具,旨在促进这一过程,包括特征提取器特征转换器和管道

您会在下面找到一个使用随机森林的相当幼稚的示例。

首先让我们导入所需的包并创建虚拟数据:

import sqlContext.implicits._
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

case class LabeledRecord(group: String, text: String)

val trainRaw = sc.parallelize(
    LabeledRecord("foo", "foo v a y b  foo") ::
    LabeledRecord("bar", "x bar y bar v") ::
    LabeledRecord("bar", "x a y bar z") ::
    LabeledRecord("foobar", "foo v b bar z") ::
    LabeledRecord("foo", "foo x") ::
    LabeledRecord("foobar", "z y x foo a b bar v") ::
    Nil
)

现在让我们定义所需的变压器和流程列车Dataset

// Tokenizer to process text fields
val tokenizer = new Tokenizer()
    .setInputCol("text")
    .setOutputCol("words")

// HashingTF to convert tokens to the feature vector
val hashingTF = new HashingTF()
    .setInputCol("words")
    .setOutputCol("features")
    .setNumFeatures(10)

// Indexer to convert String labels to Double
val indexer = new StringIndexer()
    .setInputCol("group")
    .setOutputCol("label")
    .fit(trainRaw.toDF)


def transfom(rdd: RDD[LabeledRecord]) = {
    val tokenized = tokenizer.transform(rdd.toDF)
    val hashed = hashingTF.transform(tokenized)
    val indexed = indexer.transform(hashed)
    indexed
        .select($"label", $"features")
        .map{case Row(label: Double, features: Vector) =>
            LabeledPoint(label, features)}
}

val train: RDD[LabeledPoint] = transfom(trainRaw)

请注意,这indexer是“拟合”在火车数据上的。它只是意味着用作标签的分类值被转换为doubles. 要在新数据上使用分类器,您必须先使用 this 对其进行转换indexer

接下来我们可以训练 RF 模型:

val numClasses = 3
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 4
val maxBins = 16

val model = RandomForest.trainClassifier(
    train, numClasses, categoricalFeaturesInfo, 
    numTrees, featureSubsetStrategy, impurity,
    maxDepth, maxBins
)

最后测试它:

val testRaw = sc.parallelize(
    LabeledRecord("foo", "foo  foo z z z") ::
    LabeledRecord("bar", "z bar y y v") ::
    LabeledRecord("bar", "a a  bar a z") ::
    LabeledRecord("foobar", "foo v b bar z") ::
    LabeledRecord("foobar", "a foo a bar") ::
    Nil
)

val test: RDD[LabeledPoint] = transfom(testRaw)

val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label))
val metrics = new MulticlassMetrics(predsAndLabs)

metrics.precision
metrics.recall
于 2015-08-16T17:25:41.750 回答
-2

您使用的是 Spark 1.6 而不是 Spark 2.1?我认为问题在于,在 spark 2.1 中,transform 方法返回一个数据集,该数据集可以隐式转换为类型化的 RDD,而在此之前,它返回一个数据帧或行。

尝试将转换函数的返回类型指定为 RDD[LabeledPoint] 作为诊断,看看是否得到相同的错误。

于 2017-08-03T19:13:16.650 回答