2

我想知道分布式 mahout 推荐器作业如何org.apache.mahout.cf.taste.hadoop.item.RecommenderJob处理存在重复和三重用户、项目条目但具有不同偏好值的 csv 文件。例如,如果我有一个 .csv 文件,其中包含以下条目

1,1,0.7
1,2,0.7
1,2,0.3
1,3,0.7
1,3,-0.7

Mahout 的数据模型将如何处理这个问题?它会总结给定用户、项目条目的偏好值(例如,对于用户项目 1,2,偏好将是 (0.7 + 0.3)),还是对这些值进行平均(例如,对于用户项目 1,2,偏好是(0.7 + 0.3)/2) 还是默认为最后一个用户,它检测到的项目条目(例如,对于用户 1,2,首选项值设置为 0.3)。

我问这个问题是因为我正在考虑基于多个偏好指标(项目视图、喜欢、不喜欢、保存到购物车等)的建议。如果数据模型将偏好值视为线性权重(例如,项目视图加上保存到愿望清单具有比项目视图更高的偏好分数),这将是有帮助的。如果 datamodel 已经通过求和来处理这个问题,它将为我节省额外的 map-reduce 来根据多个指标进行排序和计算总分的繁琐工作。任何人都可以就 mahout .csv 数据模型在这方面提供的任何澄清org.apache.mahout.cf.taste.hadoop.item.RecommenderJob将不胜感激。谢谢。

4

2 回答 2

5

不,它会覆盖。该模型不是附加的。然而,Myrrix 中的模型,这个代码的一个衍生物(我正在商业化)有一个基本的附加数据模式,只是因为你给出的原因。输入值是权重并且总是相加的。

于 2013-05-17T15:48:49.363 回答
1

在开始计算之前合并它。

例子:

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public final class Merge {
    public Merge() {
    }

    public static class MergeMapper extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, FloatWritable> {

        public void map(LongWritable key, Text value, OutputCollector<Text, FloatWritable> collector,
                Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            if (tokenizer.hasMoreTokens()) {
                String userId = tokenizer.nextToken(",");
                String itemId = tokenizer.nextToken(",");
                FloatWritable score = new FloatWritable(Float.valueOf(tokenizer.nextToken(",")));
                collector.collect(new Text(userId + "," + itemId), score);
            }
            else {
                System.out.println("empty line " + line);
            }

        }
    }

    public static class MergeReducer extends MapReduceBase implements
            Reducer<Text, FloatWritable, Text, FloatWritable> {

        public void reduce(Text key, Iterator<FloatWritable> scores,
                OutputCollector<Text, FloatWritable> collector, Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            float sum = 0.0f;
            while (scores.hasNext()) {
                sum += scores.next().get();
            }
            if (sum != 0.0)
                collector.collect(key, new FloatWritable(sum));
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        JobConf conf = new JobConf(Merge.class);
        conf.setJobName("Merge Data");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(FloatWritable.class);

        conf.setMapperClass(MergeMapper.class);
        // combine the same key items
        conf.setCombinerClass(MergeReducer.class);
        conf.setReducerClass(MergeReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.set("mapred.textoutputformat.separator", ",");
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path("hdfs://localhost:49000/tmp/data"));
        FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:49000/tmp/data/output"));

        JobClient.runJob(conf);
    }
}
于 2013-12-25T05:37:53.013 回答