处理此问题的一种方法是使用 ML 转换器。首先让我们将您的数据转换为 DataFrame:
ratings_df = sqlContext.createDataFrame([
(u'97990079', u'18_34', 2), (u'585853655', u'11_8', 1),
(u'1398696913', u'6_20', 1), (u'612168869', u'7_16', 1),
(u'2272846159', u'11_17', 2)],
("user_id", "item_id_str", "rating"))
接下来我们需要一个StringIndexer
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="item_id_str", outputCol="item_id")
最后让我们使用索引器转换 DataFrame:
from pyspark.sql.functions import col
transformed = (indexer
.fit(ratings_df)
.transform(ratings_df)
.withColumn("user_id", col("user_id").cast("integer"))
.select("user_id", "item_id", "rating"))
并转换为RDD[Rating]
:
from pyspark.mllib.recommendation import Rating
ratings_rdd = transformed.map(lambda r: Rating(r.user_id, r.item_id, r.rating))
在较新版本的 Spark 中,您可以跳过转换,ml.recommendation.ALS
直接使用:
from pyspark.ml.recommendation import ALS
als = (ALS(userCol="user_id", itemCol="item_id", ratingCol="rating")
.fit(transformed))