1

我需要使用所有reduce任务的结果来执行聚合。基本上,reduce 任务找到总和和计数以及一个值。我需要将所有总和和计数相加并找到最终平均值。

我尝试conf.setInt在减少中使用。但是当我尝试从主函数访问它时它失败了

class Main {

public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {
        int i = 0;
        int fd = 0, fc = 0;
        fd = context.getConfiguration().getInt("fd", -1);
        fc = context.getConfiguration().getInt("fc", -1);
        //when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it.
    }
}

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    conf.setInt("fc", 5);

    Job job = new Job(conf, "Flight Data");
    job.setJarByClass(FlightData.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(GroupComparator.class);
    job.setSortComparatorClass(KeyComparator.class);


    job.setNumReduceTasks(10);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);


    flightCount = job.getConfiguration().getInt("fc", -1);
    flightDelay = job.getConfiguration().getInt("fd", -1);
    //here when I access fc, fd, I get back 5 & 5
    System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount);
}
4

2 回答 2

0

使用新的API覆盖run()mapper 和 reducer 。org.apache.hadoop.mapreduce在这些方法中,您可以从每个映射器或化简器发出累积的总和/计数。

此外,您还需要将 reducer 计数限制为 1,以便获得多个映射器生成的所有总和的全局总和。

请参阅下面的代码以获得更清晰的信息:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

    /**
     * This is Mapper.
     * 
     */
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            try {
                // say that you need to sum up the value part
                sum+= Double.valueOf(value);
        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }

            // emit out the sum per mapper
            outputKey.set(sum);
            context.write(outputKey, outputValue);// Notice that the outputValue is empty
            cleanup(context);

        }
    }

    /**
     * This is Reducer.
     * 
     */
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {


            // summation of values from each mapper
            sum += Double.valueOf(key.toString());

        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKey()) {
                reduce(context.getCurrentKey(), context.getValues(), context);
            }

            // emit out the global sums
            outputKey.set(sum);
            context.write(outputKey, outputValue);
            cleanup(context);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        try {
            Configuration conf = getConf();

            // output key and value separator is empty as in final output only
            // key is emitted and value is empty
            conf.set("mapred.textoutputformat.separator", "");

            // Configuring mapred to have just one reducer as we need to find
            // single sum values from all the inputs
            conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
            conf.setInt("mapred.reduce.tasks", 1);

            Job job = new Job(conf);

            job.setJarByClass(AggregationExample.class);
            job.setJobName("Aggregation Example");

            job.setMapperClass(MapJob.class);
            job.setReducerClass(ReduceJob.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }

    }

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            System.out
                    .println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
            System.exit(-1);
        }

        int result = ToolRunner.run(new AggregationExample(), args);
        System.exit(result);
    }

}

您可以很好地将这种方法映射到您的问题。

于 2013-02-25T18:21:15.733 回答
0

找到了解决方案。我用计数器

http://diveintodata.org/2011/03/15/an-example-of-hadoop-mapreduce-counter/

公共类 FlightData {

//enum for counters used by reducers
public static enum FlightCounters {
    FLIGHT_COUNT,
    FLIGHT_DELAY;
}
public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {


        delay1 = Float.parseFloat(origin[5]);
        delay2 = Float.parseFloat(dest[5]);
        context.getCounter(FlightCounters.FLIGHT_COUNT).increment(1);
        context.getCounter(FlightCounters.FLIGHT_DELAY)
        .increment((long) (delay1 + delay2));

    }
}
public static void main(String[] args) throws Exception{
    float flightCount, flightDelay;
    job.waitForCompletion(true);
    //get the final results updated in counter by all map and reduce tasks
    flightCount = job.getCounters()
            .findCounter(FlightCounters.FLIGHT_COUNT).getValue();
    flightDelay = job.getCounters()
            .findCounter(FlightCounters.FLIGHT_DELAY).getValue();
}

}

于 2013-02-28T17:54:17.163 回答