6

我有一组数据,我想根据这些数据创建分类模型。每行具有以下形式:

user1,class1,product1
user1,class1,product2
user1,class1,product5
user2,class1,product2
user2,class1,product5
user3,class2,product1

大约有 100 万用户、2 个类和 100 万种产品。我接下来想做的是创建稀疏向量(MLlib 已经支持的东西)但是为了应用该函数,我必须首先创建密集向量(带有 0)。换句话说,我必须对我的数据进行二值化。最简单(或最优雅)的方法是什么?

鉴于我是 MLlib 的新手,我可以请你提供一个具体的例子吗?我正在使用 MLlib 1.2。

编辑

我最终得到了以下代码,但结果确实很慢......提供了我只能使用 MLlib 1.2 的任何其他想法吗?

val data = test11.map(x=> ((x(0) , x(1)) , x(2))).groupByKey().map(x=> (x._1 , x._2.toArray)).map{x=>
  var lt : Array[Double] = new Array[Double](test12.size)
  val id = x._1._1
  val cl = x._1._2
  val dt = x._2
  var i = -1
  test12.foreach{y => i += 1; lt(i) = if(dt contains y) 1.0 else 0.0}
  val vs = Vectors.dense(lt)
  (id , cl , vs)
}
4

3 回答 3

9

您可以使用 spark.ml 的OneHotEncoder

您首先使用:

OneHotEncoder.categories(rdd, categoricalFields)

您包含分类数据categoricalField的索引序列在哪里。,给定一个数据集和作为分类变量的列的索引,返回一个结构,对于每个字段,该结构描述了数据集中存在的值。该映射旨在用作编码方法的输入:RDDcategories

OneHotEncoder.encode(rdd, categories)

这会返回您的矢量化RDD[Array[T]].

于 2015-08-07T08:40:30.387 回答
4

如果使用内置OneHotEncoder不是一种选择,并且您只有一个变量来实现穷人的单热,这或多或少是直截了当的。首先让我们创建一个示例数据:

import org.apache.spark.mllib.linalg.{Vector, Vectors}

val rdd = sc.parallelize(List(
    Array("user1", "class1", "product1"),
    Array("user1", "class1", "product2"),
    Array("user1", "class1", "product5"),
    Array("user2", "class1", "product2"),
    Array("user2", "class1", "product5"),
    Array("user3", "class2", "product1")))

接下来我们必须创建一个从值到索引的映射:

val prodMap = sc.broadcast(rdd.map(_(2)).distinct.zipWithIndex.collectAsMap)

和一个简单的编码功能:

def encodeProducts(products: Iterable[String]): Vector =  {
    Vectors.sparse(
        prodMap.value.size,
        products.map(product => (prodMap.value(product).toInt, 1.0)).toSeq
    )
}

最后,我们可以将其应用于数据集:

rdd.map(x => ((x(0), x(1)), x(2))).groupByKey.mapValues(encodeProducts)

在上面扩展以处理多个变量相对容易。

编辑

如果产品数量太大而无法使广播有用,则应该可以使用它join。首先,我们可以创建从产品到索引的类似映射,但将其保留为 RDD:

import org.apache.spark.HashPartitioner

val nPartitions = ???

val prodMapRDD = rdd
     .map(_(2))
     .distinct
     .zipWithIndex
     .partitionBy(new HashPartitioner(nPartitions))
     .cache

val nProducts = prodMapRDD.count // Should be < Int.MaxValue

接下来我们重塑输入RDDPairRDD按产品索引:

val pairs = rdd
    .map(rec => (rec(2), (rec(0), rec(1))))
    .partitionBy(new HashPartitioner(nPartitions))

最后我们都join可以

def indicesToVec(n: Int)(indices: Iterable[Long]): Vector = {
     Vectors.sparse(n, indices.map(x => (x.toInt, 1.0)).toSeq)
}

pairs.join(prodMapRDD)
   .values
   .groupByKey
   .mapValues(indicesToVec(nProducts.toInt))
于 2015-08-07T13:16:00.817 回答
-1

原始问题要求以最简单的方法从非分类中指定分类特征。

在 Spark ML 中,您可以使用 VectorIndexer 的 setMaxCategories 方法,您不必指定字段 - 相反,它会将基数小于或等于给定数字(在本例中为 2)的那些字段理解为分类字段。

val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)

详情请参阅此回复

于 2017-12-14T11:12:31.493 回答