3

我对 spark 和 scala 都比较陌生。我试图在 spark 上使用 scala 实现协同过滤。下面是代码

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

val data = sc.textFile("/user/amohammed/CB/input-cb.txt")

val distinctUsers = data.map(x => x.split(",")(0)).distinct().map(x => x.toInt)

val distinctKeywords = data.map(x => x.split(",")(1)).distinct().map(x => x.toInt)

val ratings = data.map(_.split(',') match {
  case Array(user, item, rate) => Rating(user.toInt,item.toInt, rate.toDouble)
})

val model = ALS.train(ratings, 1, 20, 0.01)

val keywords = distinctKeywords collect
  distinctUsers.map(x => {(x, keywords.map(y => model.predict(x,y)))}).collect()

它在最后一行抛出一个scala.MatchError: null org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)如果我将 distinctUsers rdd 收集到一个数组中并执行相同的代码,Thw 代码可以正常工作:

val users = distinctUsers collect
  users.map(x => {(x, keywords.map(y => model.predict(x, y)))})

处理 RDD 时我在哪里弄错了?

Spark 版本:1.0.0 Scala 版本:2.10.4

4

1 回答 1

1

在堆栈跟踪中进一步调用一次,MatrixFactorizationModel源代码的第 43 行说:

val userVector = new DoubleMatrix(userFeatures.lookup(user).head)

注意userFeatures字段model本身就是另一个RDD;我相信当匿名函数块关闭时它没有被正确序列化model,因此lookup它的方法失败了。我也尝试将两者都model放入keywords广播变量中,但这也不起作用。

与其回退到 Scala 集合并失去 Spark 的优势,不如坚持使用 RDD 并利用其他方式来转换它们。

我会从这个开始:

val ratings = data.map(_.split(',') match {
  case Array(user, keyword, rate) => Rating(user.toInt, keyword.toInt, rate.toDouble)
})

// instead of parsing the original RDD's strings three separate times,
// you can map the "user" and "product" fields of the Rating case class

val distinctUsers = ratings.map(_.user).distinct()
val distinctKeywords = ratings.map(_.product).distinct()

val model = ALS.train(ratings, 1, 20, 0.01)

然后,我们可以获取所有可能的用户-关键字对的笛卡尔积作为 RDD,而不是逐个计算每个预测,并使用predictMatrixFactorizationModel 中的另一种方法,该方法将此类对的 RDD 作为其参数。

val userKeywords = distinctUsers.cartesian(distinctKeywords)

val predictions = model.predict(userKeywords).map { case Rating(user, keyword, rate) =>
  (user, Map(keyword -> rate))
}.reduceByKey { _ ++ _ }

现在predictions,每个用户都有一个不可变的地图,可以查询特定关键字的预测评级。如果您特别想要原始示例中的数组,则可以执行以下操作:

val keywords = distinctKeywords.collect() // add .sorted if you want them in order
val predictionArrays = predictions.mapValues(keywords.map(_))

警告:我使用 Spark 1.0.1 对此进行了测试,因为它是我安装的,但它也应该适用于 1.0.0。

于 2014-07-15T03:05:24.937 回答