0

我在hadoop中有以下代码,其中mapper和reducer如下:

public static class Map2 extends Mapper<LongWritable, Text, NullWritable, Text> 
{
    TreeMap<Text, Text> top10 = new TreeMap<Text, Text>();
    HashMap<String, String> userInfo = new HashMap<String, String>();

    public void setup(Context context) throws IOException, InterruptedException
    {
        try
        {
            URI[] uris = DistributedCache.getCacheFiles(context.getConfiguration());
            FileSystem fs = FileSystem.get(context.getConfiguration());

            if (uris == null || uris.length == 0) 
            {
                throw new IOException("Error reading file from distributed cache. No URIs found.");
            }

            String path = "./users.dat";

            fs.copyToLocalFile(new Path(uris[0]), new Path(path));

            BufferedReader br = new BufferedReader(new FileReader(path));

            String line = null;

            while((line = br.readLine()) != null)
            {
                String split[] = line.split("\\::");
                String age = split[2];
                String gender = split[1];

                userInfo.put(split[0], gender + "\t" + age);
            }
            br.close();
        }
        catch(Exception e)
        {

        }
    }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {
        try
        {
            String line = value.toString();

            int sum = Integer.parseInt(line.split("\\t")[1]);
            String userID = line.split("\\t")[0];

            String newKey = sum + " " + userID;

            if(userInfo.containsKey(userID))
            {
                String record = userInfo.get(userID);
                String val = userID + "\t" + record + "\t" + sum;

                top10.put(new Text(newKey), new Text(val));

                if (top10.size() > 10)
                {
                    top10.remove(top10.firstKey());
                }
            }
        }

        catch(Exception e)
        {

        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException 
    {
        try
        {
            for (Text s1 : top10.descendingMap().values()) 
            {
                context.write(NullWritable.get(), s1);
            }
        }
        catch(Exception e)
        {

        }
    }
}

public static class Reduce2 extends Reducer<NullWritable, Text, NullWritable, Text> 
{
    private TreeMap<Text, Text> top10 = new TreeMap<Text, Text>();

    public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
    {
        try
        {
            String line = values.toString();

            String sum = line.split("\\t")[3];
            String userID = line.split("\\t")[0];
            String gender = line.split("\\t")[1];
            String age = line.split("\\t")[2];

            String newKey = sum + " " + userID;

            String val = userID + "\t" + gender + "\t" + age + "\t" + sum;

            top10.put(new Text(newKey), new Text(val));

            if(top10.size() > 10)
            {
                top10.remove(top10.firstKey());
            }
        }

        catch(Exception e)
        {

        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException 
    {
        try
        {
            for (Text s1 : top10.descendingMap().values()) 
            {
                context.write(NullWritable.get(), s1);
            }
        }
        catch(Exception e)
        {

        }
    }
}

驱动方法如下:

Configuration conf2 = new Configuration();
        DistributedCache.addCacheFile(new Path("/Spring2014_HW-1/input_HW-1/users.dat").toUri(), conf2);

        Job job2 = new Job(conf2, "Phase2");

        job2.setOutputKeyClass(NullWritable.class);
        job2.setOutputValueClass(Text.class);

        job2.setJarByClass(MapSideJoin.class);

        job2.setMapperClass(Map2.class);
        job2.setReducerClass(Reduce2.class);

        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        //job2.setNumReduceTasks(1);
        job2.waitForCompletion(true);

即使我已经从 reducer 发出输出,我也会收到消息为 map output records = 10 和 reduce output records = 0?减速器的输出在哪里消失了?

谢谢。

4

0 回答 0