我正在使用此处提供的 MovieLens 数据集为电影制作推荐系统:http: //grouplens.org/datasets/movielens/
为了计算这个推荐系统,我在 scala 中使用了 Flink 的 ML 库,特别是 ALS 算法 ( org.apache.flink.ml.recommendation.ALS
)。
我首先将电影的评分映射到 a DataSet[(Int, Int, Double)]
,然后创建 atrainingSet
和 a testSet
(参见下面的代码)。
我的问题是当我将ALS.fit
函数与整个数据集(所有评级)一起使用时没有错误,但如果我只删除一个评级,拟合函数不再起作用,我不明白为什么.
你有什么想法?:)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
预处理.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
处理.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
“但如果我只删除一个评级”
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
错误 :
06/19/2015 15:00:24 CoGroup(在 org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570) 上的 CoGroup)(4/4)切换到 FAILED
java.lang.ArrayIndexOutOfBoundsException:5
在 org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)
在 org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)
在 org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
...