6

问题已解决最终在底部检查我的解决方案


最近我正在尝试运行 Mahout in Action 的 chaper6(清单 6.1 ~ 6.4)中的推荐示例。但是我遇到了一个问题,我用谷歌搜索了一遍,但找不到解决方案。

这是问题所在:我有一对映射器减速器

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

reducer 输出一个 userID 和一个 userVector,它看起来像这样:98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

然后我想用另一对mapper-reducer来处理这个数据

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

当我尝试运行该作业时,它会显示错误

org.apache.hadoop.io.Text 不能转换为 org.apache.mahout.math.VectorWritable

第一个 mapper-reducer 将输出写入 hdfs,第二个 mapper-reducer 尝试读取输出,映射器可以将 98955 转换为 VarLongWritable,但无法转换 {590:1.0 22:1.0 9059:1.0 3: 1.0 2:1.0 1:1.0} 到 VectorWritable,所以我想知道有没有办法让第一个 mapper-reducer 直接将输出发送到第二对,那么就不需要进行数据转换。我查看了 Hadoop 的运行情况,以及 hadoop:权威指南,似乎没有这样的方法可以做到这一点,有什么建议吗?


问题解决了

解决方案:通过使用SequenceFileOutputFormat,我们可以将第一个 MapReduce 工作流的 reduce 结果输出并保存到 DFS 上,然后第二个 MapReduce 工作流可以在创建映射器时使用SequenceFileInputFormat类作为参数读取临时文件作为输入。由于向量将保存在具有特定格式的二进制序列文件中,SequenceFileInputFormat可以读取它并将其转换回向量格式。

下面是一些示例代码:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             
             
    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();
    
    
    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
    
    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

如果您对此有任何问题,请随时与我联系

4

3 回答 3

4

您需要显式配置第一个作业的输出以使用 SequenceFileOutputFormat 并定义输出键和值类:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

在没有看到您的驱动程序代码的情况下,我猜您正在使用 TextOutputFormat 作为第一个作业的输出,并将 TextInputFormat 作为第二个作业的输入-并且此输入格式将成对发送<Text, Text>到第二个映射器

于 2012-04-22T13:12:57.150 回答
1

我是hadoop的初学者,这只是我对答案的猜测,所以请耐心等待/指出它是否看起来很幼稚。

我认为在不保存 HDFS 的情况下从减速器发送到下一个映射器是不合理的。因为“哪个数据拆分到哪个映射器”被优雅地设计为满足局部性标准。(转到本地存储数据的映射器节点)。

如果您不将其存储在 HDFS 上,则很可能所有数据都将通过网络传输,速度很慢并且可能会导致带宽问题。

于 2012-04-23T23:43:19.527 回答
0

您必须暂时保存第一个 map-reduce 的输出,以便第二个可以使用它。

这可能会帮助您了解第一个 map-reduce 的输出如何传递给第二个。(这是基于Apache nutch的Generator.java)。

这是第一个 map-reduce 输出的临时目录:

Path tempDir =
  new Path(getConf().get("mapred.temp.dir", ".")
           + "/job1-temp-"
           + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

设置第一个 map-reduce 作业:

JobConf job1 = getConf();
job1.setJobName("job 1");
FileInputFormat.addInputPath(...);
sortJob.setMapperClass(...);

FileOutputFormat.setOutputPath(job1, tempDir);
job1.setOutputFormat(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(...);
JobClient.runJob(job1);

观察在作业配置中设置了输出目录。在第二个工作中使用它:

JobConf job2 = getConf();
FileInputFormat.addInputPath(job2, tempDir);
job2.setReducerClass(...);
JobClient.runJob(job2);

完成后记得清理临时目录:

// clean up
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);

希望这可以帮助。

于 2012-04-22T10:35:20.613 回答