13

我想使用 Spark 的mllib.recommendation库来构建一个原型推荐系统。但是,我拥有的用户数据的格式是以下格式:

AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....

如果我想使用该mllib.recommendation库,根据Rating类的 API,用户 ID 必须是整数(还必须是连续的?)

看起来必须在真实用户 ID 和 Spark 使用的数字用户 ID 之间进行某种转换。但是我该怎么做呢?

4

4 回答 4

12

Spark 并不真正需要数字 id,它只需要一些唯一值,但为了实现,他们选择了 Int。

您可以对 userId 进行简单的来回转换:

  case class MyRating(userId: String, product: Int, rating: Double)

  val data: RDD[MyRating] = ???

  // Assign unique Long id for each userId
  val userIdToInt: RDD[(String, Long)] = 
    data.map(_.userId).distinct().zipWithUniqueId()

  // Reverse mapping from generated id to original
  val reverseMapping: RDD[(Long, String)]
    userIdToInt map { case (l, r) => (r, l) }

  // Depends on data size, maybe too big to keep
  // on single machine
  val map: Map[String, Int] = 
    userIdToInt.collect().toMap.mapValues(_.toInt)

  // Transform to MLLib rating
  val rating: RDD[Rating] = data.map { r =>
    Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating)
    // -- or
    Rating(map(r.userId), r.product, r.rating)
  }

  // ... train model

  // ... get back to MyRating userId from Int

  val someUserId: String = reverseMapping.lookup(123).head

您也可以尝试“data.zipWithUniqueId()”,但我不确定在这种情况下 .toInt 是否会是安全的转换,即使数据集大小很小。

于 2015-01-05T04:44:59.327 回答
5

您需要跨用户标识运行 StringIndexer 以将字符串转换为唯一整数索引。它们不必是连续的。

我们在https://www.aihello.com中将其用于我们的项目推荐引擎

df 是(用户:字符串,产品,评级)

  val stringindexer = new StringIndexer()
      .setInputCol("user")
      .setOutputCol("userNumber")
  val modelc = stringindexer.fit(df)
  val  df = modelc.transform(df)
于 2017-01-28T00:17:45.733 回答
2

@Ganesh Krishnan 是对的,StringIndexer 解决了这个问题。

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
>>> spark = SQLContext(sc)                                                                             
>>> df = spark.createDataFrame(
...     [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
...     ["id", "category"])

| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = stringIndexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> converted = converter.transform(indexed)
>>> converted.show()
+---+--------+-------------+----------------+
| id|category|categoryIndex|originalCategory|
+---+--------+-------------+----------------+
|  0|       a|          0.0|               a|
|  1|       b|          2.0|               b|
|  2|       c|          1.0|               c|
|  3|       a|          0.0|               a|
|  4|       a|          0.0|               a|
|  5|       c|          1.0|               c|
+---+--------+-------------+----------------+

>>> converted.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+
于 2017-05-11T03:08:00.023 回答
1

上述解决方案可能并不总是像我发现的那样有效。Spark 无法从其他 RDD 中执行 RDD 转换。错误输出:

org.apache.spark.SparkException:RDD 转换和动作只能输入驱动程序调用的代码,不能在其他转换中输入;例如,rdd1.map(x => rdd2.values.count() * x) 是无效的,因为值转换和计数操作不能在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。

作为一种解决方案,您可以将 userIdToInt RDD 与原始数据 RDD 连接起来,以存储 userId 和 uniqueId 之间的关系。然后稍后您可以再次将结果 RDD 与此 RDD 连接。

// Create RDD with the unique id included
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] = 
    data.keyBy(_.userId).join(userIdToInt).map(r => 
        (r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1))
于 2015-07-13T14:33:37.580 回答