3

我需要使用 map reduce 来实现一个功能。

要求在下面提到。

  1. 映射器的输入是一个包含两列 productId 和 Salescount 的文件
  2. 减速机产量,销售额总和

要求是我需要计算 salescount / sum(salescount)。

为此,我计划使用嵌套映射减少。但是对于第二个映射器,我需要使用第一个 reducers 输出和第一个 map 的输入。

我该如何实现这一点。或者有什么替代方法吗?

问候维努

4

2 回答 2

3

您可以按照自己的方式使用ChainMapperChainReducerPIPE Mappers 和 Reducers。请看这里

以下将类似于您需要实现的代码片段

JobConf mapBConf = new JobConf(false);

JobConf reduceConf = new JobConf(false);

ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class,
   FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf);

ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class,
   FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf);

ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class,
   SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null);

ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf);

或者如果您不想使用多个 Mapper 和 Reducers,您可以执行以下操作

public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {

    private static Text productId = new Text();
    private static LongWritable salesCount = new LongWritable();

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text, LongWritable> output, Reporter reporter)
            throws IOException {
        String[] values = value.toString().split("\t");
        productId.set(values[0]);           
        salesCount.set(Long.parseLong(values[1]));
        output.collect(productId, salesCount);
    }

}

public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {

    private static LongWritable productWritable = new LongWritable();

    @Override
    public void reduce(Text key, Iterator<LongWritable> values,
            OutputCollector<Text, LongWritable> output, Reporter reporter)
            throws IOException {
        List<LongWritable> items = new ArrayList<LongWritable>(); 
        long total = 0;
        LongWritable item = null;
        while(values.hasNext()) {
            item = values.next();
            total += item.get();
            items.add(item);
        }
        Iterator<LongWritable> newValues = items.iterator();
        while(newValues.hasNext()) {
            productWritable.set(newValues.next().get()/total);
            output.collect(key, productWritable);
        }
    }

}

`

于 2012-10-23T12:34:14.433 回答
0

有了这个用例,我相信我们不需要两个不同的 mappers/mapreduce 作业来实现这一点。(作为上述评论中给出的答案的扩展)

假设您有一个非常大的输入文件,在 HDFS 中拆分为多个块。当您使用此文件作为输入触发 MapReduce 作业时,多个映射器(等于输入块的数量)将开始并行执行。

在您的映射器实现中,从输入中读取每一行并将 productId 作为键和 saleCount 作为值写入上下文。该数据被传递给Reducer。

我们知道,在一个 MR 作业中,所有具有相同 key 的数据都会传递给同一个 reducer。现在,在您的 reducer 实现中,您可以计算特定 productId 的所有 saleCounts 的总和。

注意:我不确定分子中的值“salescount”。

假设它是特定产品出现次数的计数,请使用计数器在计算 SUM(saleCount) 的同一 for 循环中添加并获取总销售计数。所以,我们有

totalCount -> 产品出现次数 sumSaleCount -> 每个产品的 saleCount 值的总和。

现在,您可以直接将上述值相除:totalCount/sumSaleCount。

希望这可以帮助!如果您有不同的用例,请告诉我。

于 2016-05-31T18:20:42.907 回答