0

我想将两个文件合并为一个。我做了两个 mapper 来阅读,一个 reducer 加入。

        JobConf classifiedConf = new JobConf(new Configuration());
            classifiedConf.setJarByClass(myjob.class);
    classifiedConf.setJobName("classifiedjob");
    FileInputFormat.setInputPaths(classifiedConf,classifiedInputPath );
    classifiedConf.setMapperClass(ClassifiedMapper.class);
    classifiedConf.setMapOutputKeyClass(TextPair.class);
    classifiedConf.setMapOutputValueClass(Text.class);
    Job classifiedJob = new Job(classifiedConf);
    //first mapper config

    JobConf featureConf = new JobConf(new Configuration());
    featureConf.setJobName("featureJob");
            featureConf.setJarByClass(myjob.class);
    FileInputFormat.setInputPaths(featureConf, featuresInputPath);
    featureConf.setMapperClass(FeatureMapper.class);
    featureConf.setMapOutputKeyClass(TextPair.class);
    featureConf.setMapOutputValueClass(Text.class);
    Job featureJob = new Job(featureConf);
            //second mapper config

    JobConf joinConf = new JobConf(new Configuration());
    joinConf.setJobName("joinJob");
            joinConf.setJarByClass(myjob.class);
    joinConf.setReducerClass(JoinReducer.class);
    joinConf.setOutputKeyClass(Text.class);
    joinConf.setOutputValueClass(Text.class);
    Job joinJob = new Job(joinConf);
             //reducer config
             //JobControl config
            joinJob.addDependingJob(featureJob);
    joinJob.addDependingJob(classifiedJob);
    secondJob.addDependingJob(joinJob);
    JobControl jobControl = new JobControl("jobControl");
    jobControl.addJob(classifiedJob);
    jobControl.addJob(featureJob);
    jobControl.addJob(secondJob);

    Thread thread = new Thread(jobControl);
    thread.start();
    while(jobControl.allFinished()){
        jobControl.stop();
    }

但是,我收到这条消息:WARN mapred.JobClient:

Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

任何人都请帮助......

4

3 回答 3

0

您使用的是哪个版本的 Hadoop?

您收到的警告会停止程序吗?

您不需要使用 setJarByClass()。你可以看到我的代码片段,我可以在不使用 setJarByClass() 方法的情况下运行它。

JobConf job = new JobConf(PageRankJob.class);
job.setJobName("PageRankJob");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);

job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

JobClient.runJob(job);
于 2013-01-08T14:53:16.743 回答
0

您应该以这种方式实施您的工作:

public class MyApp extends Configured implements Tool {

    public int run(String[] args) throws Exception {
      // Configuration processed by ToolRunner
      Configuration conf = getConf();

      // Create a JobConf using the processed conf
      JobConf job = new JobConf(conf, MyApp.class);

      // Process custom command-line options
      Path in = new Path(args[1]);
      Path out = new Path(args[2]);

      // Specify various job-specific parameters     
      job.setJobName("my-app");
      job.setInputPath(in);
      job.setOutputPath(out);
      job.setMapperClass(MyMapper.class);
      job.setReducerClass(MyReducer.class);

      // Submit the job, then poll for progress until the job is complete
      JobClient.runJob(job);
      return 0;
    }

    public static void main(String[] args) throws Exception {
      // Let ToolRunner handle generic command-line options 
      int res = ToolRunner.run(new Configuration(), new MyApp(), args);

      System.exit(res);
    }
}

这直接来自 Hadoop 的文档here

所以基本上你的工作需要继承Configured和实现Tool. 这将迫使您实施run(). 然后从你的主课开始你的工作,Toolrunner.run(<your job>, <args>)警告就会消失。

于 2013-01-08T16:06:03.737 回答
0

您需要在驱动程序中有此代码job.setJarByClass(MapperClassName.class);

于 2014-05-02T18:06:24.910 回答