我有一个 MapReduce 工具,它在第一个映射器上冻结,没有明显的输出。因为这是单节点安装,所以我一直无法访问作业跟踪器 Web 界面进行调试。无论输入文件有多大,我都会得到这种行为。我已经整整一天都在研究这个,并准备把我的头发拉出来。输出如下所示:
13/09/12 15:12:14 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/09/12 15:12:14 WA
RN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/09/12 15:12:14 INFO input.FileInputFormat: Total input paths to process : 1
13/09/12 15:12:14 INFO mapred.JobClient: Running job: job_local1132137425_0001
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Waiting for map tasks
13/09/12 15:12:14 INFO mapred.LocalJobRunner: Starting task: attempt_local1132137425_0001_m_000000_0
13/09/12 15:12:14 INFO util.ProcessTree: setsid exited with exit code 0
13/09/12 15:12:14 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@339c98d3
13/09/12 15:12:14 INFO mapred.MapTask: Processing split: file:/home/axelmagn/EclipseWorkspace/AxelMagnusonCoursework/assign-2/data/in/input.csv:0+33554432
13/09/12 15:12:14 WARN snappy.LoadSnappy: Snappy native library not loaded
13/09/12 15:12:14 INFO mapred.MapTask: io.sort.mb = 100
13/09/12 15:12:14 INFO mapred.MapTask: data buffer = 79691776/99614720
13/09/12 15:12:14 INFO mapred.MapTask: record buffer = 262144/327680
13/09/12 15:12:15 INFO mapred.JobClient: map 0% reduce 0%
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:15 INFO mapred.MapTask: Starting flush of map output
13/09/12 15:12:20 INFO mapred.LocalJobRunner:
13/09/12 15:12:21 INFO mapred.JobClient: map 20% reduce 0%
然后它只是无限期地挂起。
工具例程(删节):
tempPath = new Path("/tmp/" + outDirPath.getName() + "_1_" + now);
tempPath2 = new Path("/tmp/" + outDirPath.getName() + "_2_" + now);
job1 = new VisitorCountJob(inFilePath, tempPath);
success = job1.waitForCompletion(true);
if (!success)
throw new Exception("Visitor Count Job Failed.");
job2 = new TopVisitorJob(tempPath, outDirPath, TOPN);
success = job2.waitForCompletion(true);
return success ? 0 : 1;
工作:
public class VisitorCountJob extends Job {
public static final String TAB = "\t";
public VisitorCountJob(Path inputPath, Path outputPath)
throws IOException {
super();
this.setJarByClass(VisitorCountJob.class);
this.setJobName("Visitor Count");
this.setInputFormatClass(VisitInputFormat.class);
VisitInputFormat.setInputPaths(this, inputPath);
FileOutputFormat.setOutputPath(this, outputPath);
this.setMapperClass(VisitorCountMapper.class);
this.setReducerClass(VisitorCountReducer.class);
this.setOutputKeyClass(Person.class);
this.setOutputValueClass(IntWritable.class);
this.setOutputFormatClass(SequenceFileOutputFormat.class);
}
}
映射器:
public class VisitorCountMapper extends
Mapper<LongWritable, Visit, Person, IntWritable> {
@Override
public void map(LongWritable key, Visit value, Context context)
throws IOException, InterruptedException {
try {
Person visitor = value.getVisitor();
context.write(visitor, new IntWritable(1));
} catch (IOException e) {
e.printStackTrace();
throw e;
} catch (InterruptedException e) {
e.printStackTrace();
throw e;
}
}
}
减速机:
public class VisitorCountReducer extends
Reducer<Person, IntWritable, Person, IntWritable> {
@Override
public void reduce(Person visitor, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(visitor, new IntWritable(count));
}
}
我还编写了一个 InputFormat 和 RecordReader 来从原始文本生成访问对象,但为了简洁起见,我将把它们排除在外,除非有人认为它们是相关的。
我真的无能为力,所以非常感谢任何帮助。
编辑: 由于表达的兴趣,这里是我的一些数据类型实现:
人:
public class Person implements WritableComparable<Person> {
public Text firstName;
public Text lastName;
public Person() {}
public Person(Text firstName, Text lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
public Person(String firstName, String lastName) {
this(new Text(firstName), new Text(lastName));
}
public void readFields(DataInput in) throws IOException {
firstName.readFields(in);
lastName.readFields(in);
}
public void write(DataOutput out) throws IOException {
firstName.write(out);
lastName.write(out);
}
public int compareTo(Person other) {
int out;
// give sorting preference to first name
out = firstName.compareTo(other.firstName);
if(out != 0)
return out;
return lastName.compareTo(other.lastName);
}
}
访问输入格式:
public class VisitInputFormat extends FileInputFormat<LongWritable, Visit> {
public RecordReader<LongWritable, Visit> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
VisitRecordReader reader = new VisitRecordReader();
reader.initialize(split, context);
return reader;
}
}
访问记录阅读器:
public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public VisitRecordReader() {
lineReader = new LineRecordReader();
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
lineReader.initialize(genericSplit, context);
}
public boolean nextKeyValue() throws IOException {
return lineReader.nextKeyValue();
}
public LongWritable getCurrentKey() {
return lineReader.getCurrentKey();
}
public Visit getCurrentValue() {
String raw = lineReader.getCurrentValue().toString();
return new Visit(raw);
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
访问:
public class VisitRecordReader extends RecordReader<LongWritable, Visit> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public VisitRecordReader() {
lineReader = new LineRecordReader();
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
lineReader.initialize(genericSplit, context);
}
public boolean nextKeyValue() throws IOException {
return lineReader.nextKeyValue();
}
public LongWritable getCurrentKey() {
return lineReader.getCurrentKey();
}
public Visit getCurrentValue() {
String raw = lineReader.getCurrentValue().toString();
return new Visit(raw);
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}