我是 PySpark 的新手。我正在尝试使用python和pyspark.mllib.recommendation pakage为推荐目的实现ALS(交替最小二乘矩阵分解)。根据PySpark 文档,我发现我应该使用排名指标来评估系统的隐式反馈。但不幸的是,文档在 python 部分没有更新,当我尝试自己实现它时,我在 RDD 类型上遇到了一些不同的问题。请帮我找到错误。我不确定是否应该在createDataFrame使用.rdd或者我应该使用另一个函数来创建 rdd 类型......
def build_model_Als(self):
data = self.load_from_redis()
self.dataframe = Pandas.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})
train = self.dataframe.sample(frac=0.8 , random_state=99)
test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]
ts = test.drop(columns=['rate'])
ps = test.drop(columns=['user'])
ratings = spark.createDataFrame(self.dataframe).rdd
testdata = spark.createDataFrame(ts).rdd
self.model = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01, nonnegative=True)
predictions = self.model.predictAll(testdata)
ratesAndPreds = ratings.join(predictions)
metrics = RankingMetrics(ratesAndPreds)
print("Mean average precision =", metrics.meanAveragePrecision )
这是错误:
py4j.protocol.Py4JJavaError: 调用 o207.meanAveragePrecision 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 213.0 中的任务 0 失败 1 次,最近一次失败:阶段 213.0 中丢失的任务 0.0(TID 273,本地主机,执行程序驱动程序):java.lang.ClassCastException : java.lang.Long 不能在 org.apache.spark.sql.catalyst.expressions.GenericRow 的 org.apache.spark.sql.Row$class.getSeq(Row.scala:283) 中转换为 scala.collection.Seq .getSeq(rows.scala:166) at org.apache.spark.mllib.api.python.PythonMLLibAPI$$anonfun$newRankingMetrics$1.apply(PythonMLLibAPI.scala:1070) at org.apache.spark.mllib.api.python .PythonMLLibAPI$$anonfun$newRankingMetrics$1.apply(PythonMLLibAPI.scala:1070) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection。