1

我正在使用 Elastic Map Reduce 在 Amazon WebServices 上实施 PageRanking。以下是我的代码。问题是 FileSystem API(使用 Hadoop 2.2.0)中的重命名和删除方法似乎没有完成他们的工作。

我错过了什么吗?

    public class PageRankDriver{
private static Logger log = Logger.getLogger(PageRankDriver.class.toString()); 
public static void main(String[] args) throws IOException
{
    if (args.length != 4||Integer.parseInt(args[3])<0)
    {
        System.err.println("Usage: PageRank <nodes path> <edges path> <output path> <# of iterations> ");
        System.exit(-1);
    }

    /*
     * Initial config
     */

    JobConf conf = new JobConf(PageRankDriver.class);

    log.info("Test Debug 1");


    FileSystem fs= FileSystem.get(URI.create(args[0]),conf);

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    /*
     * Get the number of nodes
     */
    FSDataInputStream nodesFile = fs.open(new Path(otherArgs[0]));
    int count = 0;
    while (nodesFile.readLine() != null)
    {
        count++;
    }
    conf.set("NODE_COUNT", count + "");

    //delete old output folder
    fs.delete(new Path(otherArgs[2]),true);

    conf.setJobName("PageRank");
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(conf, new Path(otherArgs[2]));
    TextInputFormat.setInputPaths(conf, new Path(otherArgs[1]));
    conf.setMapperClass(PageRankFirstMapper.class);
    conf.setReducerClass(PageRankFirstReducer.class);
    log.info("----"+TextOutputFormat.getOutputPath(conf).getName());
    conf.setBoolean("mapred.output.compress", false);

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setJarByClass(PageRankDriver.class);
    conf.setJar("PageRank.jar");

    JobClient.runJob(conf);
    log.info("Test Debug 2");
    //This is used to adjust the path for hdfs or local machine
    String pre="";  
    //||fs.getWorkingDirectory().toString().contains("hadoop");s3://awsee599
    if(fs.getWorkingDirectory().toString().contains("hdfs")||fs.getWorkingDirectory().toString().contains("hadoop"))
    {
        pre="s3://awsee599/";
    }
    /*Path src=new Path(otherArgs[2]+"/part-00000");
    Path dst=new Path(pre);
    fs.copyToLocalFile(src, dst);*/
    boolean test1=fs.rename(new Path(otherArgs[2]+"/part-00000"), new Path(pre+"temp.txt"));
    log.info("Test Debug 3------------"+pre+" Working directory::"+fs.getWorkingDirectory().toString()+" Rename----------"+test1);
    boolean test2=fs.delete(new Path(otherArgs[2]),true);
    log.info("Test Debug 3------------"+pre+" Working directory::"+fs.getWorkingDirectory().toString()+"Delete----"+test2);
    conf.clear();
    boolean test = false;
    /***************************************
     * Now onto the next few iterations
     ***************************************/
    for (int i = 0; i < Integer.parseInt(otherArgs[3]); i++)
    {
        JobConf confNext = new JobConf(PageRankDriver.class);
        log.info("------------FRUIT LOOPS::::"+i);
        //log.info("----"+FileOutputFormat.getOutputPath(conf).getName());
        confNext.setJobName("PageRank");
        confNext.setBoolean("mapred.output.compress", false);
        confNext.setInputFormat(TextInputFormat.class);
        confNext.setOutputFormat(TextOutputFormat.class);
        log.info("Test debug 4---------------"+fs.getWorkingDirectory().toString());
        //FileOutputFormat.getOutputPath(conf);
        if(fs.exists(new Path(otherArgs[2])))
        {
            log.info("--------------------------output exists-----------------------");
            /*fs.delete(new Path(otherArgs[2]),true);*/
        }
        TextOutputFormat.setOutputPath(confNext, new Path(otherArgs[2]));
        log.info("----"+otherArgs[2]);
        if(fs.exists(new Path(pre+"temp.txt")))
        {
            log.info("--------------------------temp.txt exists-----------------------");
        }
        TextInputFormat.setInputPaths(confNext, new Path(pre+"temp.txt"));
        /*FileInputFormat.setInputPaths(confNext, new Path(pre+"temp.txt"));*/


        Path[] path=TextInputFormat.getInputPaths(confNext);
        for (int j = 0; j < path.length; j++) {
            log.info("---------"+path[j].getName()+"------------");
            log.info("------------"+pre+"------------");
        }
        log.info("----"+TextOutputFormat.getOutputPath(confNext).getName());

        confNext.setMapperClass(PageRankSecondMapper.class);
        confNext.setReducerClass(PageRankSecondReducer.class);

        confNext.setOutputKeyClass(Text.class);
        confNext.setOutputValueClass(Text.class);
                    /*ERROR PRINTED BELOW is thrown from here*/ 
        RunningJob jb=JobClient.runJob(confNext);
        if(jb.isSuccessful())
            log.info("Yes-------"+jb.getTrackingURL());
        /*fs.setPermission(new Path(pre+"part-00000"), FsPermission.getFileDefault());*/

        test=fs.delete(new Path(pre+"temp.txt"),true);
        log.info("DELETE-temp.txt------------"+test);
        log.info("temp.txt exists----------"+fs.exists(new Path(pre+"temp.txt")));

        log.info(otherArgs[2]+"/part-00000"+"::"+pre+"temp.txt");

        test=fs.rename(new Path(otherArgs[2]+"/part-00000"), new Path(pre+"temp.txt"));
        /*Path src1=new Path(otherArgs[2]+"/part-00000");
        Path dst1=new Path(pre);
        fs.copyToLocalFile(src1, dst1);
        fs.setPermission(new Path(otherArgs[2]), FsPermission.getFileDefault());*/
        fs.delete(new Path(otherArgs[2]),true);
        log.info("After delete and rename;temp.txt exists----------"+fs.exists(new Path(pre+"temp.txt")));
        log.info("After delete and rename;Output exists----------"+fs.exists(new Path(otherArgs[2])));
        confNext.clear();
    }
    log.info("Exiting Main---------------------------");
}

}

在 EMR 中运行上述代码时出现以下异常。有趣的是,For 循环运行第一次迭代时没有大惊小怪。在第一次运行中访问相同的路径,在第二次运行中抛出以下异常

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://awsee599/temp.txt
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:1044)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1036)
    at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:174)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:952)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:905)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java
4

0 回答 0