我需要使用 map reduce 来实现一个功能。
要求在下面提到。
- 映射器的输入是一个包含两列 productId 和 Salescount 的文件
- 减速机产量,销售额总和
要求是我需要计算 salescount / sum(salescount)。
为此,我计划使用嵌套映射减少。但是对于第二个映射器,我需要使用第一个 reducers 输出和第一个 map 的输入。
我该如何实现这一点。或者有什么替代方法吗?
问候维努
您可以按照自己的方式使用ChainMapper
和ChainReducer
PIPE Mappers 和 Reducers。请看这里
以下将类似于您需要实现的代码片段
JobConf mapBConf = new JobConf(false);
JobConf reduceConf = new JobConf(false);
ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class,
FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf);
ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class,
FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf);
ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class,
SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null);
ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf);
或者如果您不想使用多个 Mapper 和 Reducers,您可以执行以下操作
public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
private static Text productId = new Text();
private static LongWritable salesCount = new LongWritable();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String[] values = value.toString().split("\t");
productId.set(values[0]);
salesCount.set(Long.parseLong(values[1]));
output.collect(productId, salesCount);
}
}
public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
private static LongWritable productWritable = new LongWritable();
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
List<LongWritable> items = new ArrayList<LongWritable>();
long total = 0;
LongWritable item = null;
while(values.hasNext()) {
item = values.next();
total += item.get();
items.add(item);
}
Iterator<LongWritable> newValues = items.iterator();
while(newValues.hasNext()) {
productWritable.set(newValues.next().get()/total);
output.collect(key, productWritable);
}
}
}
`
有了这个用例,我相信我们不需要两个不同的 mappers/mapreduce 作业来实现这一点。(作为上述评论中给出的答案的扩展)
假设您有一个非常大的输入文件,在 HDFS 中拆分为多个块。当您使用此文件作为输入触发 MapReduce 作业时,多个映射器(等于输入块的数量)将开始并行执行。
在您的映射器实现中,从输入中读取每一行并将 productId 作为键和 saleCount 作为值写入上下文。该数据被传递给Reducer。
我们知道,在一个 MR 作业中,所有具有相同 key 的数据都会传递给同一个 reducer。现在,在您的 reducer 实现中,您可以计算特定 productId 的所有 saleCounts 的总和。
注意:我不确定分子中的值“salescount”。
假设它是特定产品出现次数的计数,请使用计数器在计算 SUM(saleCount) 的同一 for 循环中添加并获取总销售计数。所以,我们有
totalCount -> 产品出现次数 sumSaleCount -> 每个产品的 saleCount 值的总和。
现在,您可以直接将上述值相除:totalCount/sumSaleCount。
希望这可以帮助!如果您有不同的用例,请告诉我。