我正在编写一个小型 Java 程序,将作业从 Windows 机器远程提交到 Hadoop 机器。输入和输出来自 MongoDB。该作业在服务器上运行时失败。以下是我从工作跟踪器得到的错误:
Job initialization failed: java.net.UnknownHostException: <my ip>:27017 is not a valid Inet address at
org.apache.hadoop.net.NetUtils.verifyHostnames(NetUtils.java:587) at
org.apache.hadoop.mapred.JobInProgress.initTasks(JobInProgress.java:734) at
org.apache.hadoop.mapred.JobTracker.initJob(JobTracker.java:3890) at
org.apache.hadoop.mapred.EagerTaskInitializationListener$InitJob.run(EagerTaskInitializationListener.java:79) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
java.lang.Thread.run(Thread.java:724)
谁能帮我解决这个问题?下面是我的 Java 代码。
public class MyFirstJob extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapred.job.tracker", "<my ip>:54311");
conf.set("mongo.input.uri", "mongodb://<my ip>:27017/hadoop_in.yield_historical_in");
conf.set("mongo.output.uri", "mongodb://<my ip>:27017/hadoop_out.yield_historical_out");
Job job = new Job(conf, "Word Count");
job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);
job.setMapOutputKeyClass(org.apache.hadoop.io.NullWritable.class);
job.setMapOutputValueClass(com.mongodb.hadoop.io.BSONWritable.class);
// specify output types
job.setOutputFormatClass(com.mongodb.hadoop.MongoOutputFormat.class);
job.setOutputKeyClass(org.apache.hadoop.io.NullWritable.class);
job.setOutputValueClass(com.mongodb.hadoop.io.BSONWritable.class);
// specify a mapper
job.setMapperClass(TreasuryYieldMapper.class);
// specify a reducer
job.setReducerClass(TreasuryYieldReducer.class);
job.setCombinerClass(TreasuryYieldReducer.class);
job.setJarByClass(MyFirstJob.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hduser");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hduser");
ugi.doAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
Configuration conf = new Configuration();
conf.set("hadoop.job.ugi", "hduser");
ToolRunner.run(conf, new MyFirstJob(), null);
return null;
}
});
}
}
public class TreasuryYieldMapper
extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {
@Override
public void map(final Object pKey,
final BSONObject pValue,
final Context pContext)
throws IOException, InterruptedException {
final int year = ((Date) pValue.get("_id")).getYear() + 1900;
double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue();
pContext.write(new IntWritable(year), new DoubleWritable(bid10Year));
}
private static final Log LOG = LogFactory.getLog(TreasuryYieldMapper.class);
}
public class TreasuryYieldReducer
extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {
private static final Log LOG = LogFactory.getLog(TreasuryYieldReducer.class);
@Override
public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)
throws IOException, InterruptedException {
int count = 0;
double sum = 0;
for (final DoubleWritable value : pValues) {
sum += value.get();
count++;
}
final double avg = sum / count;
LOG.debug("Average 10 Year Treasury for " + pKey.get() + " was " + avg);
BasicBSONObject output = new BasicBSONObject();
output.put("count", count);
output.put("avg", avg);
output.put("sum", sum);
pContext.write(pKey, new BSONWritable(output));
}
}