我正在使用 Elastic Map Reduce 在 Amazon WebServices 上实施 PageRanking。以下是我的代码。问题是 FileSystem API(使用 Hadoop 2.2.0)中的重命名和删除方法似乎没有完成他们的工作。
我错过了什么吗?
public class PageRankDriver{
private static Logger log = Logger.getLogger(PageRankDriver.class.toString());
public static void main(String[] args) throws IOException
{
if (args.length != 4||Integer.parseInt(args[3])<0)
{
System.err.println("Usage: PageRank <nodes path> <edges path> <output path> <# of iterations> ");
System.exit(-1);
}
/*
* Initial config
*/
JobConf conf = new JobConf(PageRankDriver.class);
log.info("Test Debug 1");
FileSystem fs= FileSystem.get(URI.create(args[0]),conf);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* Get the number of nodes
*/
FSDataInputStream nodesFile = fs.open(new Path(otherArgs[0]));
int count = 0;
while (nodesFile.readLine() != null)
{
count++;
}
conf.set("NODE_COUNT", count + "");
//delete old output folder
fs.delete(new Path(otherArgs[2]),true);
conf.setJobName("PageRank");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
TextOutputFormat.setOutputPath(conf, new Path(otherArgs[2]));
TextInputFormat.setInputPaths(conf, new Path(otherArgs[1]));
conf.setMapperClass(PageRankFirstMapper.class);
conf.setReducerClass(PageRankFirstReducer.class);
log.info("----"+TextOutputFormat.getOutputPath(conf).getName());
conf.setBoolean("mapred.output.compress", false);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setJarByClass(PageRankDriver.class);
conf.setJar("PageRank.jar");
JobClient.runJob(conf);
log.info("Test Debug 2");
//This is used to adjust the path for hdfs or local machine
String pre="";
//||fs.getWorkingDirectory().toString().contains("hadoop");s3://awsee599
if(fs.getWorkingDirectory().toString().contains("hdfs")||fs.getWorkingDirectory().toString().contains("hadoop"))
{
pre="s3://awsee599/";
}
/*Path src=new Path(otherArgs[2]+"/part-00000");
Path dst=new Path(pre);
fs.copyToLocalFile(src, dst);*/
boolean test1=fs.rename(new Path(otherArgs[2]+"/part-00000"), new Path(pre+"temp.txt"));
log.info("Test Debug 3------------"+pre+" Working directory::"+fs.getWorkingDirectory().toString()+" Rename----------"+test1);
boolean test2=fs.delete(new Path(otherArgs[2]),true);
log.info("Test Debug 3------------"+pre+" Working directory::"+fs.getWorkingDirectory().toString()+"Delete----"+test2);
conf.clear();
boolean test = false;
/***************************************
* Now onto the next few iterations
***************************************/
for (int i = 0; i < Integer.parseInt(otherArgs[3]); i++)
{
JobConf confNext = new JobConf(PageRankDriver.class);
log.info("------------FRUIT LOOPS::::"+i);
//log.info("----"+FileOutputFormat.getOutputPath(conf).getName());
confNext.setJobName("PageRank");
confNext.setBoolean("mapred.output.compress", false);
confNext.setInputFormat(TextInputFormat.class);
confNext.setOutputFormat(TextOutputFormat.class);
log.info("Test debug 4---------------"+fs.getWorkingDirectory().toString());
//FileOutputFormat.getOutputPath(conf);
if(fs.exists(new Path(otherArgs[2])))
{
log.info("--------------------------output exists-----------------------");
/*fs.delete(new Path(otherArgs[2]),true);*/
}
TextOutputFormat.setOutputPath(confNext, new Path(otherArgs[2]));
log.info("----"+otherArgs[2]);
if(fs.exists(new Path(pre+"temp.txt")))
{
log.info("--------------------------temp.txt exists-----------------------");
}
TextInputFormat.setInputPaths(confNext, new Path(pre+"temp.txt"));
/*FileInputFormat.setInputPaths(confNext, new Path(pre+"temp.txt"));*/
Path[] path=TextInputFormat.getInputPaths(confNext);
for (int j = 0; j < path.length; j++) {
log.info("---------"+path[j].getName()+"------------");
log.info("------------"+pre+"------------");
}
log.info("----"+TextOutputFormat.getOutputPath(confNext).getName());
confNext.setMapperClass(PageRankSecondMapper.class);
confNext.setReducerClass(PageRankSecondReducer.class);
confNext.setOutputKeyClass(Text.class);
confNext.setOutputValueClass(Text.class);
/*ERROR PRINTED BELOW is thrown from here*/
RunningJob jb=JobClient.runJob(confNext);
if(jb.isSuccessful())
log.info("Yes-------"+jb.getTrackingURL());
/*fs.setPermission(new Path(pre+"part-00000"), FsPermission.getFileDefault());*/
test=fs.delete(new Path(pre+"temp.txt"),true);
log.info("DELETE-temp.txt------------"+test);
log.info("temp.txt exists----------"+fs.exists(new Path(pre+"temp.txt")));
log.info(otherArgs[2]+"/part-00000"+"::"+pre+"temp.txt");
test=fs.rename(new Path(otherArgs[2]+"/part-00000"), new Path(pre+"temp.txt"));
/*Path src1=new Path(otherArgs[2]+"/part-00000");
Path dst1=new Path(pre);
fs.copyToLocalFile(src1, dst1);
fs.setPermission(new Path(otherArgs[2]), FsPermission.getFileDefault());*/
fs.delete(new Path(otherArgs[2]),true);
log.info("After delete and rename;temp.txt exists----------"+fs.exists(new Path(pre+"temp.txt")));
log.info("After delete and rename;Output exists----------"+fs.exists(new Path(otherArgs[2])));
confNext.clear();
}
log.info("Exiting Main---------------------------");
}
}
在 EMR 中运行上述代码时出现以下异常。有趣的是,For 循环运行第一次迭代时没有大惊小怪。在第一次运行中访问相同的路径,在第二次运行中抛出以下异常
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://awsee599/temp.txt
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:1044)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1036)
at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:174)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:952)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:905)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java