6

我在 Spark 中有一个 RDD,其中的对象基于一个案例类:

ExampleCaseClass(user: User, stuff: Stuff)

我想使用 Spark 的 ML 管道,所以我将其转换为 Spark 数据帧。作为管道的一部分,我想将其中一列转换为条目为向量的列。由于我希望该向量的长度随模型而变化,因此它应该作为特征转换的一部分内置到管道中。

所以我试图定义一个 Transformer 如下:

class MyTransformer extends Transformer {

  val uid = ""
  val num: IntParam = new IntParam(this, "", "")

  def setNum(value: Int): this.type = set(num, value)
  setDefault(num -> 50)

  def transform(df: DataFrame): DataFrame = {
    ...
  }

  def transformSchema(schema: StructType): StructType = {
    val inputFields = schema.fields
    StructType(inputFields :+ StructField("colName", ???, true))
  }

  def copy (extra: ParamMap): Transformer = defaultCopy(extra)

}

如何指定结果字段的DataType(即填写???)?它将是某个简单类(Boolean、Int、Double 等)的 Vector。看起来 VectorUDT 可能有效,但这是 Spark 私有的。由于任何 RDD 都可以转换为 DataFrame,因此任何案例类都可以转换为自定义 DataType。但是我不知道如何手动进行这种转换,否则我可以将它应用于一些包装向量的简单案例类。

此外,如果我为列指定向量类型,当我去拟合模型时,VectorAssembler 是否会正确地将向量处理为单独的特征?

Spark 尤其是 ML Pipeline 仍然是新手,因此感谢任何建议。

4

2 回答 2

6
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType  
def transformSchema(schema: StructType): StructType = {
  val inputFields = schema.fields
  StructType(inputFields :+ StructField("colName", VectorType, true))
}

在 spark 2.1 VectorType 中,VectorUDT 公开可用:

package org.apache.spark.ml.linalg

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.types.DataType

/**
 * :: DeveloperApi ::
 * SQL data types for vectors and matrices.
 */
@Since("2.0.0")
@DeveloperApi
object SQLDataTypes {

  /** Data type for [[Vector]]. */
  val VectorType: DataType = new VectorUDT

  /** Data type for [[Matrix]]. */
  val MatrixType: DataType = new MatrixUDT
}
于 2017-05-10T22:14:59.843 回答
4
import org.apache.spark.mllib.linalg.{Vector, Vectors}

case class MyVector(vector: Vector)
val vectorDF = Seq(
  MyVector(Vectors.dense(1.0,3.4,4.4)),
  MyVector(Vectors.dense(5.5,6.7))
).toDF

vectorDF.printSchema
root
 |-- vector: vector (nullable = true)

println(vectorDF.schema.fields(0).dataType.prettyJson)
{
  "type" : "udt",
  "class" : "org.apache.spark.mllib.linalg.VectorUDT",
  "pyClass" : "pyspark.mllib.linalg.VectorUDT",
  "sqlType" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "type",
      "type" : "byte",
      "nullable" : false,
      "metadata" : { }
    }, {
      "name" : "size",
      "type" : "integer",
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "indices",
      "type" : {
        "type" : "array",
        "elementType" : "integer",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "values",
      "type" : {
        "type" : "array",
        "elementType" : "double",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    } ]
  }
}
于 2016-03-18T03:54:57.357 回答