48

如何使用 spark-ml 和 不 spark-mllib处理分类数据?

认为文档不是很清楚,似乎分类器例如RandomForestClassifier, LogisticRegression, 有一个featuresCol参数,它指定 中的特征列的名称DataFrame,以及一个labelCol参数,它指定 中的标记类列的名称DataFrame

显然我想在我的预测中使用多个特征,所以我尝试使用 将VectorAssembler我的所有特征放在一个向量中featuresCol

但是,VectorAssembler唯一接受数字类型、布尔类型和向量类型(根据 Spark 网站),所以我不能将字符串放入我的特征向量中。

我应该如何进行?

4

5 回答 5

58

我只是想完成霍尔顿的回答。

Spark 2.3.0以来,OneHotEncoder已弃用并将在3.0.0. 请OneHotEncoderEstimator改用。

斯卡拉

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer}

val df = Seq((0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)).toDF("id", "category1", "category2")

val indexer = new StringIndexer().setInputCol("category1").setOutputCol("category1Index")
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol, "category2"))
  .setOutputCols(Array("category1Vec", "category2Vec"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

pipeline.fit(df).transform(df).show
// +---+---------+---------+--------------+-------------+-------------+
// | id|category1|category2|category1Index| category1Vec| category2Vec|
// +---+---------+---------+--------------+-------------+-------------+
// |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
// |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
// |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// +---+---------+---------+--------------+-------------+-------------+

Python中:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

df = spark.createDataFrame([(0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)], ["id", "category1", "category2"])

indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
inputs = [indexer.getOutputCol(), "category2"]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["categoryVec1", "categoryVec2"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline.fit(df).transform(df).show()
# +---+---------+---------+--------------+-------------+-------------+
# | id|category1|category2|category1Index| categoryVec1| categoryVec2|
# +---+---------+---------+--------------+-------------+-------------+
# |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
# |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
# |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# +---+---------+---------+--------------+-------------+-------------+

Spark 1.4.0开始,MLLib 还提供OneHotEncoder功能,该功能将一列标签索引映射到一列二进制向量,最多只有一个单值。

这种编码允许期望连续特征的算法(例如逻辑回归)使用分类特征

让我们考虑以下内容DataFrame

val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"))
            .toDF("id", "category")

第一步是使用以下内容创建DataFrame索引StringIndexer

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

indexed.show
// +---+--------+-------------+                                                    
// | id|category|categoryIndex|
// +---+--------+-------------+
// |  0|       a|          0.0|
// |  1|       b|          2.0|
// |  2|       c|          1.0|
// |  3|       a|          0.0|
// |  4|       a|          0.0|
// |  5|       c|          1.0|
// +---+--------+-------------+

然后,您可以对categoryIndexwith进行编码OneHotEncoder

import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
                   .setInputCol("categoryIndex")
                   .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show
// +---+-------------+
// | id|  categoryVec|
// +---+-------------+
// |  0|(2,[0],[1.0])|
// |  1|    (2,[],[])|
// |  2|(2,[1],[1.0])|
// |  3|(2,[0],[1.0])|
// |  4|(2,[0],[1.0])|
// |  5|(2,[1],[1.0])|
// +---+-------------+
于 2015-08-28T19:35:42.260 回答
49

我将从另一个角度提供答案,因为我还想知道 Spark ML(不是 MLlib)中基于树的模型的分类特征,并且文档并不清楚一切是如何工作的。

当您使用pyspark.ml.feature.StringIndexer额外的元数据转换数据框中的列时,会存储在数据框中,专门将转换后的特征标记为分类特征。

当您打印数据框时,您将看到一个数值(这是与您的分类值之一相对应的索引),如果您查看架构,您将看到新转换的列的类型为double. 但是,您创建的这个新列pyspark.ml.feature.StringIndexer.transform不仅仅是一个普通的双列,它还有与之关联的额外元数据,这非常重要。您可以通过查看数据框架构中相应字段的属性来检查此元数据metadata(您可以通过查看 yourdataframe.schema 来访问数据框的架构对象)

这个额外的元数据有两个重要的含义:

  1. 当您.fit()在使用基于树的模型时调用时,它将扫描数据帧的元数据并识别您使用转换器编码为分类的字段,例如pyspark.ml.feature.StringIndexer(如上所述,还有其他转换器也会产生这种效果,例如pyspark.ml.feature.VectorIndexer)。因此,在 spark ML 中使用基于树的模型时,在使用 StringIndxer 转换特征后,您不必对特征进行 one-hot 编码(但是,在使用其他模型时,您仍然必须执行 one-hot 编码自然地处理分类,如线性回归等)。

  2. 由于此元数据存储在数据框中,因此您可以随时使用pyspark.ml.feature.IndexToString将数字索引反转回原始分类值(通常是字符串)。

于 2016-11-15T16:56:44.663 回答
7

有一个称为 ML 管道的组件,StringIndexer您可以使用它以合理的方式将字符串转换为 Double。http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer有更多文档,http://spark.apache.org/docs/ latest/ml-guide.html展示了如何构建管道。

于 2015-08-28T18:42:56.613 回答
0

我使用以下方法对 Spark dataFrame 中的单个列进行 oneHotEncoding:

def ohcOneColumn(df, colName, debug=False):

  colsToFillNa = []

  if debug: print("Entering method ohcOneColumn")
  countUnique = df.groupBy(colName).count().count()
  if debug: print(countUnique)

  collectOnce = df.select(colName).distinct().collect()
  for uniqueValIndex in range(countUnique):
    uniqueVal = collectOnce[uniqueValIndex][0]
    if debug: print(uniqueVal)
    newColName = str(colName) + '_' + str(uniqueVal) + '_TF'
    df = df.withColumn(newColName, df[colName]==uniqueVal)
    colsToFillNa.append(newColName)
  df = df.drop(colName)
  df = df.na.fill(False, subset=colsToFillNa)
  return df

我对 oneHotEncoding Spark dataFrames 使用以下方法:

from pyspark.sql.functions import col, countDistinct, approxCountDistinct
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

def detectAndLabelCat(sparkDf, minValCount=5, debug=False, excludeCols=['Target']):
  if debug: print("Entering method detectAndLabelCat")
  newDf = sparkDf
  colList = sparkDf.columns

  for colName in sparkDf.columns:
    uniqueVals = sparkDf.groupBy(colName).count()
    if debug: print(uniqueVals)
    countUnique = uniqueVals.count()
    dtype = str(sparkDf.schema[colName].dataType)
    #dtype = str(df.schema[nc].dataType)
    if (colName in excludeCols):
      if debug: print(str(colName) + ' is in the excluded columns list.')

    elif countUnique == 1:
      newDf = newDf.drop(colName)
      if debug:
        print('dropping column ' + str(colName) + ' because it only contains one unique value.')
      #end if debug
    #elif (1==2):
    elif ((countUnique < minValCount) | (dtype=="String") | (dtype=="StringType")):
      if debug: 
        print(len(newDf.columns))
        oldColumns = newDf.columns
      newDf = ohcOneColumn(newDf, colName, debug=debug)
      if debug: 
        print(len(newDf.columns))
        newColumns = set(newDf.columns) - set(oldColumns)
        print('Adding:')
        print(newColumns)
        for newColumn in newColumns:
          if newColumn in newDf.columns:
            try:
              newUniqueValCount = newDf.groupBy(newColumn).count().count()
              print("There are " + str(newUniqueValCount) + " unique values in " + str(newColumn))
            except:
              print('Uncaught error discussing ' + str(newColumn))
          #else:
          #  newColumns.remove(newColumn)

        print('Dropping:')
        print(set(oldColumns) - set(newDf.columns))

    else:
      if debug: print('Nothing done for column ' + str(colName))

      #end if countUnique == 1, elif countUnique other condition
    #end outer for
  return newDf
于 2019-07-17T17:58:29.077 回答
-2

您可以使用 cast 函数将 spark 数据框中的字符串列类型转换为数值数据类型。

from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType, IntegerType

sqlContext = SQLContext(sc)
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')   

dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))

在上面的示例中,我们将 csv 文件作为数据帧读入,将默认字符串数据类型转换为整数和双精度,并覆盖原始数据帧。然后,我们可以使用 VectorAssembler 将特征合并到一个向量中,并应用您最喜欢的 Spark ML 算法。

于 2017-05-27T23:20:19.557 回答