我有一个使用 mapreduce 代码从 FTP 服务器读取数据的代码。我们用来连接ftp服务器的代码如下`
String inputPath = args[0];
String outputPath = args[1];
Configuration conf1 = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf1, args).getRemainingArgs();
Path arg = new Path(inputPath);
FTPFileSystem ftpfs = new FTPFileSystem();
Path arg1 =new Path(outputPath);
ftpfs.setConf(conf1);
String ftpUser = URLEncoder.encode("username", "UTF-8");
String ftpPass = URLEncoder.encode("password", "UTF-8");
String url = String.format("ftp://%s:%s@ftpserver.com",
ftpUser, ftpPass);
ftpfs.initialize(new URI(url), conf1);
JobConf conf = new JobConf(FTPIF.class);
FileOutputFormat.setOutputPath(conf, arg1));
FileInputFormat.setInputPaths(conf, ftpfs.makeQualified(arg));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(NullWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setInputFormat(CustomInputFormat.class);
conf.setMapperClass(CustomMap.class);
conf.setReducerClass(CustomReduce.class);
JobClient.runJob(conf);
`
问题是这段代码在伪模式下工作得很好,但是在集群上运行时在服务器错误上登录失败。错误堆栈跟踪是
ERROR security.UserGroupInformation: PriviledgedActionException as:username (auth:SIMPLE) cause:java.io.IOException: Login failed on server - 0.0.0.0, port - 21
Exception in thread "main" java.io.IOException: Login failed on server - 0.0.0.0, port - 21
at org.apache.hadoop.fs.ftp.FTPFileSystem.connect(FTPFileSystem.java:133)
at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:389)
at org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:2106)
at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1566)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1503)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:174)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:205)
at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:1041)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1033)
at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:943)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:896)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:896)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:870)
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1319)
at FTPIF.run(FTPIF.java:164)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at FTPIF.main(FTPIF.java:169)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:208
集群与 ftp 有连接。使用的凭据是正确的。任何想法为什么代码无法连接到 ftp ?