0

我有一个包含 1264 条记录的 SequenceFile。每个键对于每条记录都是唯一的。我的问题是我的映射器似乎正在读取这个文件两次或者它被读取了两次。为了进行完整性检查,我编写了一个小实用程序类来读取 SequenceFile,实际上,只有 1264 条记录(即 SequenceFile.Reader)。

在我的 reducer 中,每个 Iterable 我应该只获得 1 条记录,但是,当我遍历 iterable(Iterator)时,我每个 Key 获得 2 条记录(每个 Key 总是 2 条,而不是每个 Key 1 或 3 条或其他东西)。

我的工作的日志输出如下。我不知道为什么,但为什么“要处理的总输入路径”是 2?当我运行我的作业时,我尝试了 -Dmapred.input.dir=/data 和 -Dmapred.input.dir=/data/part-r-00000,但处理的总路径仍然是 2。

任何想法表示赞赏。

12/03/01 05:28:30 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/03/01 05:28:30 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.JobClient: Running job: job_local_0001
12/03/01 05:28:31 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.Merger: Merging 2 sorted segments
12/03/01 05:28:31 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 307310 bytes
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/03/01 05:28:32 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/03/01 05:28:32 INFO mapred.JobClient:  map 100% reduce 0%
12/03/01 05:28:32 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to results
12/03/01 05:28:32 INFO mapred.LocalJobRunner: reduce > reduce
12/03/01 05:28:32 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/03/01 05:28:33 INFO mapred.JobClient:  map 100% reduce 100%
12/03/01 05:28:33 INFO mapred.JobClient: Job complete: job_local_0001
12/03/01 05:28:33 INFO mapred.JobClient: Counters: 12
12/03/01 05:28:33 INFO mapred.JobClient:   FileSystemCounters
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_READ=1320214
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1275041
12/03/01 05:28:33 INFO mapred.JobClient:   Map-Reduce Framework
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input groups=1264
12/03/01 05:28:33 INFO mapred.JobClient:     Combine output records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map input records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Spilled Records=5056
12/03/01 05:28:33 INFO mapred.JobClient:     Map output bytes=301472
12/03/01 05:28:33 INFO mapred.JobClient:     Combine input records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input records=2528

我的映射器类非常简单。它读入一个文本文件。对于每一行,它将“m”附加到该行。

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

 private static final Log _log = LogFactory.getLog(MyMapper.class);

 @Override
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  String s = (new StringBuilder()).append(value.toString()).append("m").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
 }
}

我的减速器类也很简单。它只是将“r”附加到该行。

public class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

private static final Log _log = LogFactory.getLog(MyReducer.class);

@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 for(Iterator<Text> it = values.iterator(); it.hasNext();) {
  Text txt = it.next();
  String s = (new StringBuilder()).append(txt.toString()).append("r").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
  }
 }
}

我的工作课程如下。

public class MyJob extends Configured implements Tool {

public static void main(String[] args) throws Exception {
 ToolRunner.run(new Configuration(), new MyJob(), args);
}

@Override
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 Path input = new Path(conf.get("mapred.input.dir"));
 Path output = new Path(conf.get("mapred.output.dir"));

 System.out.println("input = " + input);
 System.out.println("output = " + output);

 Job job = new Job(conf, "dummy job");
 job.setMapOutputKeyClass(LongWritable.class);
 job.setMapOutputValueClass(Text.class);
 job.setOutputKeyClass(LongWritable.class);
 job.setOutputValueClass(Text.class);

 job.setMapperClass(MyMapper.class);
 job.setReducerClass(MyReducer.class);

 FileInputFormat.addInputPath(job, input);
 FileOutputFormat.setOutputPath(job, output);

 job.setJarByClass(MyJob.class);

 return job.waitForCompletion(true) ? 0 : 1;
 }
}

我的输入数据如下所示。

T, T
T, T
T, T
F, F
F, F
F, F
F, F
T, F
F, T

运行我的作业后,我得到如下输出。

0   T, Tmr
0   T, Tmr
6   T, Tmr
6   T, Tmr
12  T, Tmr
12  T, Tmr
18  F, Fmr
18  F, Fmr
24  F, Fmr
24  F, Fmr
30  F, Fmr
30  F, Fmr
36  F, Fmr
36  F, Fmr
42  T, Fmr
42  T, Fmr
48  F, Tmr
48  F, Tmr

我在设置工作时做错了什么吗?我尝试了以下方法来运行我的作业,并且在这种方法中,文件只被读取一次。为什么是这样?System.out.println(inpath) 和 System.out.println(outpath) 的值是相同的!帮助?

public class MyJob2 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: MyJob2 <in> <out>");
      System.exit(2);
    }

    String sInput = args[0];
    String sOutput = args[1];

    Path input = new Path(sInput);
    Path output = new Path(sOutput);

    System.out.println("input = " + input);
    System.out.println("output = " + output);

    Job job = new Job(conf, "dummy job");
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);

    job.setJarByClass(MyJob2.class);

    int result = job.waitForCompletion(true) ? 0 : 1;
    System.exit(result);
 }
}
4

2 回答 2

3

我从 hadoop 邮件列表中获得了帮助。我的问题在于下面的行。

FileInputFormat.addInputPath(job, input);

这一行只是将输入附加回配置。在注释掉这一行之后,输入文件现在只读取一次。事实上,我也注释掉了另一行,

FileOutputFormat.setOutputPath(job, output);

一切仍然有效。

于 2012-03-06T13:55:12.057 回答
0

我遇到过类似的问题,但出于不同的原因:linux 显然创建了我的输入文件 (~input.txt) 的隐藏副本,所以这是获得此错误的第二种方法..

于 2014-12-19T13:14:36.023 回答