1

我正在一个包含两个节点(主/从和从)的 hadoop 集群上构建示例 Map/Reduce 任务。以下是我的规格:

$HADOOP_HOME = /usr/local/hadoop
My M/R classfiles path = $HADOOP_HOME/MyMapRed_classes
My Mapper classfile = $HADOOP_HOME/MyMapRed_classes/MyMapper
My Reducer classfile = $HADOOP_HOME/MyMapRed_classes/MyReducer
My Jar path = $HADOOP_HOME/MyMapred/MyMapRed.jar
My HDFS Input Path = /user/hadoop/MyMapRed/inputfile
My HDFS Output Path = /user/hadoop/MyMapRed_output

我正在运行 M/R 任务如下

<myusername>@localhost:/usr/local/hadoop$ bin/hadoop jar $HADOOP_HOME/MyMapRed/MyMapRed.jar -input /user/hadoop/MyMapRed/inputfile -output /user/hadoop/MyMapRed_output/ -mapper $HADOOP_HOME/MyMapRed_classes/MyMapper -reducer $HADOOP_HOME/MyMapRed_classes/MyReducer

但似乎无法从下面的消息中找到输入文件

Exception in thread "main" java.lang.ClassNotFoundException: -input
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.hadoop.util.RunJar.main(RunJar.java:149)

下面是我正在使用的 MyMapRed 类。它有一个对列表作为输入。减速器应该给出每组的平均 Val。

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class MyMapRed {

public static class MyMapper extends MapReduceBase 
implements Mapper<Text, Text, Text, DoubleWritable> {

    private final static Text Group = new Text();
    private final static DoubleWritable Val = new DoubleWritable();

    public void map(Text key, Text value, OutputCollector<Text, DoubleWritable> output, Reporter reporter) 
        throws IOException {
        String line = value.toString();
        String[] KeyAndVal = line.split("\t",2);
        Group.set(KeyAndVal[0]);
        Val.set(Double.valueOf(KeyAndVal[1]));
        output.collect(Group, Val);
    }
}

public static class MyReducer extends MapReduceBase 
implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    public void reduce(Text key, Iterator<DoubleWritable> values,
            OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException {
        DoubleWritable val = new DoubleWritable();
        double valSum = 0.0;
        int valCnt = 0;
        while (values.hasNext()) {
            val = values.next();
            valSum += val.get();
            valCnt++;
        }
        if (valCnt>0)
            valSum = valSum/valCnt;
        output.collect(key, new DoubleWritable(valSum));
    }
}


public static void main(String[] args) {
    JobClient client = new JobClient();
    JobConf conf = new JobConf(MyMapRed.class);
    conf.setJobName("MyMapRed");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(DoubleWritable.class);

    conf.setMapperClass(MyMapper.class);
    conf.setReducerClass(MyReducer.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.addInputPath(conf, new Path("input"));
    FileOutputFormat.setOutputPath(conf, new Path("output"));

    client.setConf(conf);

    try {
        JobClient.runJob(conf);
    } catch (Exception e) {
        e.printStackTrace();
    }

}
}

任何人都可以提出我错过的 ClassNotFoundException 吗?

4

1 回答 1

3

如果您的 MapRed.jar 文件没有在 jar 清单中定义主类,则 hadoop 期望 jar 之后的参数是包含静态 main 方法的类。

您需要命名您的主类(包含public static void main(String args[])方法并通过 JobConf 或 Job 类配置作业的类)。

如果看起来您也在尝试使用流式传输,如果您的映射器和化简器是用 Java 编写的,那么这不是必需的

于 2012-06-23T21:07:06.010 回答