我正在一个包含两个节点(主/从和从)的 hadoop 集群上构建示例 Map/Reduce 任务。以下是我的规格:
$HADOOP_HOME = /usr/local/hadoop
My M/R classfiles path = $HADOOP_HOME/MyMapRed_classes
My Mapper classfile = $HADOOP_HOME/MyMapRed_classes/MyMapper
My Reducer classfile = $HADOOP_HOME/MyMapRed_classes/MyReducer
My Jar path = $HADOOP_HOME/MyMapred/MyMapRed.jar
My HDFS Input Path = /user/hadoop/MyMapRed/inputfile
My HDFS Output Path = /user/hadoop/MyMapRed_output
我正在运行 M/R 任务如下
<myusername>@localhost:/usr/local/hadoop$ bin/hadoop jar $HADOOP_HOME/MyMapRed/MyMapRed.jar -input /user/hadoop/MyMapRed/inputfile -output /user/hadoop/MyMapRed_output/ -mapper $HADOOP_HOME/MyMapRed_classes/MyMapper -reducer $HADOOP_HOME/MyMapRed_classes/MyReducer
但似乎无法从下面的消息中找到输入文件
Exception in thread "main" java.lang.ClassNotFoundException: -input
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.hadoop.util.RunJar.main(RunJar.java:149)
下面是我正在使用的 MyMapRed 类。它有一个对列表作为输入。减速器应该给出每组的平均 Val。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class MyMapRed {
public static class MyMapper extends MapReduceBase
implements Mapper<Text, Text, Text, DoubleWritable> {
private final static Text Group = new Text();
private final static DoubleWritable Val = new DoubleWritable();
public void map(Text key, Text value, OutputCollector<Text, DoubleWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] KeyAndVal = line.split("\t",2);
Group.set(KeyAndVal[0]);
Val.set(Double.valueOf(KeyAndVal[1]));
output.collect(Group, Val);
}
}
public static class MyReducer extends MapReduceBase
implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterator<DoubleWritable> values,
OutputCollector<Text, DoubleWritable> output, Reporter reporter)
throws IOException {
DoubleWritable val = new DoubleWritable();
double valSum = 0.0;
int valCnt = 0;
while (values.hasNext()) {
val = values.next();
valSum += val.get();
valCnt++;
}
if (valCnt>0)
valSum = valSum/valCnt;
output.collect(key, new DoubleWritable(valSum));
}
}
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(MyMapRed.class);
conf.setJobName("MyMapRed");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(DoubleWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setReducerClass(MyReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(conf, new Path("input"));
FileOutputFormat.setOutputPath(conf, new Path("output"));
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
任何人都可以提出我错过的 ClassNotFoundException 吗?