1

我是 PySpark 的新手。我正在尝试使用pythonpyspark.mllib.recommendation pakage为推荐目的实现ALS(交替最小二乘矩阵分解)。根据PySpark 文档,我发现我应该使用排名指标来评估系统的隐式反馈。但不幸的是,文档在 python 部分没有更新,当我尝试自己实现它时,我在 RDD 类型上遇到了一些不同的问题。请帮我找到错误。我不确定是否应该在createDataFrame使用.rdd或者我应该使用另一个函数来创建 rdd 类型......

def build_model_Als(self):
   data = self.load_from_redis()
   self.dataframe = Pandas.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})

    train = self.dataframe.sample(frac=0.8 , random_state=99)
    test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]

    ts = test.drop(columns=['rate'])
    ps = test.drop(columns=['user'])

    ratings = spark.createDataFrame(self.dataframe).rdd
    testdata = spark.createDataFrame(ts).rdd

    self.model = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01, nonnegative=True)
    predictions = self.model.predictAll(testdata)
    ratesAndPreds = ratings.join(predictions)
    metrics = RankingMetrics(ratesAndPreds)
    print("Mean average precision =", metrics.meanAveragePrecision )

这是错误:

py4j.protocol.Py4JJavaError: 调用 o207.meanAveragePrecision 时出错。:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 213.0 中的任务 0 失败 1 次,最近一次失败:阶段 213.0 中丢失的任务 0.0(TID 273,本地主机,执行程序驱动程序):java.lang.ClassCastException : java.lang.Long 不能在 org.apache.spark.sql.catalyst.expressions.GenericRow 的 org.apache.spark.sql.Row$class.getSeq(Row.scala:283) 中转换为 scala.collection.Seq .getSeq(rows.scala:166) at org.apache.spark.mllib.api.python.PythonMLLibAPI$$anonfun$newRankingMetrics$1.apply(PythonMLLibAPI.scala:1070) at org.apache.spark.mllib.api.python .PythonMLLibAPI$$anonfun$newRankingMetrics$1.apply(PythonMLLibAPI.scala:1070) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection。

4

1 回答 1

0

我尝试了基于 PYSpark 的 RDD 和 Dataframe 的不同方法来构建、拟合和评估模型。最后我发现“join”函数不能帮助实现rateAndPreds,因为它应该使用spark.SparkContext.parallelize函数构建在 RDD 结构中。我已将功能更改如下:

 def build_model_Als(self):
    data = self.load_from_redis()
    self.dataframe = pd.DataFrame({"user": data[0:, 0], "item": data[0:, 1], "rate": data[0:, 2]})
    train = self.dataframe.sample(frac=0.8 , random_state=99)
    test = self.dataframe.loc[~self.dataframe.index.isin(train.index), :]
    ts = test.drop(columns=['rate'])
    logging.info("start train")

    als = ALS(rank=10, maxIter=10, regParam=0.01, userCol="user", itemCol="item", implicitPrefs=True,
              ratingCol="rate", coldStartStrategy="drop", nonnegative=True)
    self.model = als.fit(spark.createDataFrame(train))
    logging.info("done")

    global_user = None
    user_list = np.array([], dtype=np.float64)
    testItems = list()
    for row in test.iterrows():

        if row[1]['user'] != global_user:
            user_list = np.append(user_list, row[1]['user'])
            testItems.append(int(row[1]['item']))
            global_user = row[1]['user']
        else:
            testItems.append(int(row[1]['item']))
        true_list[global_user] = testItems

    pandasDf = pd.DataFrame({'user': user_list})
    sub_user = spark.createDataFrame(pandasDf)
    labelsList = list()
    for user, items in self.model.recommendForUserSubset(sub_user, 30).collect():

        predict_items = [i.item for i in items]
        labelsList.append((predict_items, true_list[user]))
    labels = spark.sparkContext.parallelize(labelsList)
    metrics = RankingMetrics(labels)
    print(metrics.meanAveragePrecision)

重点是spark.sparkContext.parallelize返回一个正确的 RDD 以在RankingMetrics.meanAveragePrecision找到交集,但join不能。例如: spark.sparkContext.parallelize([([1,2,3,4], [1,2,12,13,1]), ]) 为 meanAveragePrecision 生成适当的 RDD,结果为:40%

于 2020-01-05T11:43:30.277 回答