我正在尝试使用 MapReduce 和随机数生成来生成 pi 的估计值。我正在为我的进程提供 50 个小文件以启动 50 个地图作业。每个文件中只包含一个“hello”。我的 Mapper 阶段的输出显示 50 行,如下所示:
“你好” 1 “你好” 2 “你好” 3 ...
当然,这不是我的想法。我的程序显然没有像我希望的那样忽略输入。输入应该服务的唯一目的是为每个文件启动一个映射作业。
下面是我的 MapReduce 代码。现在,我在运行 jar 文件时使用 -D 命令行参数忽略了 reduce 阶段。
import java.util.Random;
import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PiCalc extends Configured implements Tool {
public static class MapClass extends Mapper<IntWritable, IntWritable, DoubleWritable, DoubleWritable> {
// private FloatWritable insideCircle = new FloatWritable();
// private FloatWritable insideSquare = new FloatWritable();
public void map(IntWritable key, DoubleWritable value, Context context) throws IOException, InterruptedException {
int ITERATIONS = 1000;
int inCircle = 0;
int n = 0;
double x, y;
for (int i = 0; i < ITERATIONS ; i++)
{
x = Math.random();
y = Math.random();
n++;
if ( x*x + y*y <= 1 ){
inCircle++;
}
}
// insideCircle.set(inCircle);
// insideSquare.set(n);
// insideCircle.set(99);
// insideCircle.set(88);
context.write(new DoubleWritable(inCircle), new DoubleWritable(n));
}
}
public static class Reduce extends Reducer<FloatWritable, FloatWritable, Text, FloatWritable> {
public void reduce(Iterable<FloatWritable> key, Iterable<FloatWritable> values, Context context)
throws IOException, InterruptedException {
float pi = 0;
float numerator = 0;
float denominator = 0;
for (FloatWritable k:key){
numerator += k.get();
}
for (FloatWritable val:values){
denominator += val.get();
}
pi = 4 * numerator / denominator;
context.write(new Text("Pi: "), new FloatWritable(pi));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// conf.set("key.value.separator.in.input.line", ",");
Job job = new Job(conf, "PiCalc");
job.setJarByClass(PiCalc.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("PiCalc");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setNumMapTasks(50);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new PiCalc(), args);
System.exit(res);
}
}
任何建议将不胜感激。谢谢!