1

我正在尝试使用 MapReduce 和随机数生成来生成 pi 的估计值。我正在为我的进程提供 50 个小文件以启动 50 个地图作业。每个文件中只包含一个“hello”。我的 Mapper 阶段的输出显示 50 行,如下所示:

“你好” 1 “你好” 2 “你好” 3 ...

当然,这不是我的想法。我的程序显然没有像我希望的那样忽略输入。输入应该服务的唯一目的是为每个文件启动一个映射作业。

下面是我的 MapReduce 代码。现在,我在运行 jar 文件时使用 -D 命令行参数忽略了 reduce 阶段。

import java.util.Random;
import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

    public class PiCalc extends Configured implements Tool {

        public static class MapClass extends Mapper<IntWritable, IntWritable, DoubleWritable, DoubleWritable> {
//          private FloatWritable insideCircle = new FloatWritable();
//          private FloatWritable insideSquare = new FloatWritable();
            public void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {

            int ITERATIONS = 1000;                                            
            int inCircle = 0;
            int n = 0;

             double x, y;                                                 

             for (int i = 0; i < ITERATIONS ; i++)                         
             {                                                           
                x = Math.random();                       
                y = Math.random();                                               

                n++;                                                       

                if ( x*x + y*y <= 1 ){             
                   inCircle++;   
                }       
             }   

//           insideCircle.set(inCircle);
//           insideSquare.set(n);

//           insideCircle.set(99);
//           insideCircle.set(88);

             context.write(new DoubleWritable(inCircle), new DoubleWritable(n));
            }
        }   

        public static class Reduce extends Reducer<FloatWritable, FloatWritable, Text, FloatWritable> {

            public void reduce(Iterable<FloatWritable> key, Iterable<FloatWritable> values, Context context) 
                    throws IOException, InterruptedException {

                float pi = 0;
                float numerator = 0;
                float denominator = 0;

                for (FloatWritable k:key){   
                    numerator += k.get();      
                }

                for (FloatWritable val:values){  
                    denominator += val.get();        
                }

                pi = 4 * numerator / denominator;

                context.write(new Text("Pi: "), new FloatWritable(pi));
            }
        }   
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();      
//          conf.set("key.value.separator.in.input.line", ",");
            Job job = new Job(conf, "PiCalc");        
            job.setJarByClass(PiCalc.class);            
            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job, in);
            FileOutputFormat.setOutputPath(job, out);
            job.setJobName("PiCalc");     
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);   
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
//          job.setNumMapTasks(50);
            System.exit(job.waitForCompletion(true)?0:1);
            return 0;
        }  
        public static void main(String[] args) throws Exception { 
            int res = ToolRunner.run(new Configuration(), new PiCalc(), args);       
            System.exit(res);
        }
    }

任何建议将不胜感激。谢谢!

4

0 回答 0