当谈到 ML 算法的实现时,我是一个新手。我想实现一个推荐引擎,并在经过少量实验后知道协同过滤可以用于相同的目的。我也在使用 Apache Spark。我从其中一个博客中获得了帮助,并尝试在我的本地实现相同的功能。我试过的 PFB 代码。每次我执行此操作时,正在打印的推荐计数始终为零。我没有看到任何明显的错误。有人可以帮我理解这一点。此外,请随时提供在这方面可以参考的任何其他参考。
package mllib.example;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import scala.Tuple2;
public class RecommendationEngine {
public static void main(String[] args) {
// Create Java spark context
SparkConf conf = new SparkConf().setAppName("Recommendation System Example").setMaster("local[2]").set("spark.executor.memory","1g");
JavaSparkContext sc = new JavaSparkContext(conf);
// Read user-item rating file. format - userId,itemId,rating
JavaRDD<String> userItemRatingsFile = sc.textFile(args[0]);
System.out.println("Count is "+userItemRatingsFile.count());
// Read item description file. format - itemId, itemName, Other Fields,..
JavaRDD<String> itemDescritpionFile = sc.textFile(args[1]);
System.out.println("itemDescritpionFile Count is "+itemDescritpionFile.count());
// Map file to Ratings(user,item,rating) tuples
JavaRDD<Rating> ratings = userItemRatingsFile.map(new Function<String, Rating>() {
public Rating call(String s) {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]), Integer
.parseInt(sarray[1]), Double.parseDouble(sarray[2]));
}
});
System.out.println("Ratings RDD Object"+ratings.first().toString());
// Create tuples(itemId,ItemDescription), will be used later to get names of item from itemId
JavaPairRDD<Integer,String> itemDescritpion = itemDescritpionFile.mapToPair(
new PairFunction<String, Integer, String>() {
@Override
public Tuple2<Integer, String> call(String t) throws Exception {
String[] s = t.split(",");
return new Tuple2<Integer,String>(Integer.parseInt(s[0]), s[1]);
}
});
System.out.println("itemDescritpion RDD Object"+ratings.first().toString());
// Build the recommendation model using ALS
int rank = 10; // 10 latent factors
int numIterations = Integer.parseInt(args[2]); // number of iterations
MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings),
rank, numIterations);
//ALS.trainImplicit(arg0, arg1, arg2)
// Create user-item tuples from ratings
JavaRDD<Tuple2<Object, Object>> userProducts = ratings
.map(new Function<Rating, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(Rating r) {
return new Tuple2<Object, Object>(r.user(), r.product());
}
});
// Calculate the itemIds not rated by a particular user, say user with userId = 1
JavaRDD<Integer> notRatedByUser = userProducts.filter(new Function<Tuple2<Object,Object>, Boolean>() {
@Override
public Boolean call(Tuple2<Object, Object> v1) throws Exception {
if (((Integer) v1._1).intValue() != 0) {
return true;
}
return false;
}
}).map(new Function<Tuple2<Object,Object>, Integer>() {
@Override
public Integer call(Tuple2<Object, Object> v1) throws Exception {
return (Integer) v1._2;
}
});
// Create user-item tuples for the items that are not rated by user, with user id 1
JavaRDD<Tuple2<Object, Object>> itemsNotRatedByUser = notRatedByUser
.map(new Function<Integer, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(Integer r) {
return new Tuple2<Object, Object>(0, r);
}
});
// Predict the ratings of the items not rated by user for the user
JavaRDD<Rating> recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();
// Sort the recommendations by rating in descending order
recomondations = recomondations.sortBy(new Function<Rating,Double>(){
@Override
public Double call(Rating v1) throws Exception {
return v1.rating();
}
}, false, 1);
System.out.println("recomondations Total is "+recomondations.count());
// Get top 10 recommendations
JavaRDD<Rating> topRecomondations = sc.parallelize(recomondations.take(10));
// Join top 10 recommendations with item descriptions
JavaRDD<Tuple2<Rating, String>> recommendedItems = topRecomondations.mapToPair(
new PairFunction<Rating, Integer, Rating>() {
@Override
public Tuple2<Integer, Rating> call(Rating t) throws Exception {
return new Tuple2<Integer,Rating>(t.product(),t);
}
}).join(itemDescritpion).values();
System.out.println("recommendedItems count is "+recommendedItems.count());
//Print the top recommendations for user 1.
recommendedItems.foreach(new VoidFunction<Tuple2<Rating,String>>() {
@Override
public void call(Tuple2<Rating, String> t) throws Exception {
System.out.println(t._1.product() + "\t" + t._1.rating() + "\t" + t._2);
}
});
另外,我看到这项工作正在运行很长时间。每次创建模型时。有没有一种方法可以创建一次模型,将其持久化并为连续预测加载相同的模型。我们能否提高这项工作的执行速度
提前致谢