50

I have a dataset of (user, product, review), and want to feed it into mllib's ALS algorithm.

The algorithm needs users and products to be numbers, while mine are String usernames and String SKUs.

Right now, I get the distinct users and SKUs, then assign numeric IDs to them outside of Spark.

I was wondering whether there was a better way of doing this. The one approach I've thought of is to write a custom RDD that essentially enumerates 1 through n, then call zip on the two RDDs.

4

5 回答 5

45

Starting with Spark 1.0 there are two methods you can use to solve this easily:

  • RDD.zipWithIndex is just like Seq.zipWithIndex, it adds contiguous (Long) numbers. This needs to count the elements in each partition first, so your input will be evaluated twice. Cache your input RDD if you want to use this.
  • RDD.zipWithUniqueId also gives you unique Long IDs, but they are not guaranteed to be contiguous. (They will only be contiguous if each partition has the same number of elements.) The upside is that this does not need to know anything about the input, so it will not cause double-evaluation.
于 2014-07-12T23:27:16.967 回答
15

对于类似的示例用例,我只是对字符串值进行了哈希处理。请参阅http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

听起来你已经在做这样的事情了,尽管散列可以更容易管理。

Matei 在这里建议了一种在 RDD 上进行模拟的方法zipWithIndex,这相当于在每个分区内分配全局唯一的 ID:https ://groups.google.com/forum/#!topic/spark-users/WxXvcn2gl1E

于 2014-05-29T20:38:07.050 回答
8

另一个简单的选择,如果使用 DataFrames 并且只关心唯一性是使用函数MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

编辑:自 Spark 2.0 以来MonotonicallyIncreasingID已被弃用和删除;它现在被称为.monotonically_increasing_id

于 2016-07-18T14:43:34.373 回答
3

monotonically_increasing_id() 似乎是答案,但不幸的是它不适用于 ALS,因为它产生 64 位数字而 ALS 需要 32 位数字(请参阅我在 radek1st 对 deets 的回答下方的评论)。

我找到的解决方案是使用zipWithIndex(),如 Darabos 的回答中所述。以下是如何实现它:

如果您已经有一个包含不同用户的单列 DataFrame userids,您可以创建一个查找表 (LUT),如下所示:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

现在你可以:

  • 使用此 LUT 获取 ALS 友好的整数 ID 以提供给 ALS
  • 当您需要从 ALS ID 回到原始 ID 时,使用此 LUT 进行反向查找

显然,对项目做同样的事情。

于 2016-09-01T01:45:31.357 回答
2

人们已经推荐了monotonically_increasing_id(),并提到了它创建 Longs 而不是 Ints 的问题。

但是,根据我的经验(警告-Spark 1.6)-如果您在单个执行程序上使用它(之前重新分区为 1),则不使用执行程序前缀,并且可以安全地将数字转换为 Int。显然,您需要少于 Integer.MAX_VALUE 行。

于 2016-11-29T12:21:49.167 回答