我正在尝试使用 SpringData 运行 mapreduce。我创建了一个 Mapper 类、Reducer 类、Main 类和应用程序 context.xml。
应用程序上下文由 Hadoop 配置组成。Hadoop 是在基于 linux 的 ubuntu 机器中设置的。程序是从 windows 环境执行的。
在 Hadoop 中,在 Coresite.xml 中,对于身份验证,我提供了一个用户名,程序将通过该用户名执行。
我的配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<context:property-placeholder location="resources/hadoop.properties" />
<hdp:configuration>
fs.default.name=${fs.default.name}
mapred.job.tracker=${mapred.job.tracker}
</hdp:configuration>
<bean id="jobConf" class="org.apache.hadoop.mapred.JobConf">
<constructor-arg ref="hadoopConfiguration"></constructor-arg>
<constructor-arg>
<value type="java.lang.Class">com.hadoop.basics.WordCounter</value>
</constructor-arg>
</bean>
<hdp:job id="wcJob" configuration-ref="jobConf"
mapper="com.hadoop.basics.WordMapper"
reducer="com.hadoop.basics.WordReducer" input-path="${wordcount.input.path}"
output-path="${wordcount.output.path}" user="hduser">
</hdp:job>
<hdp:job-runner id="wcJobRunner" job-ref="wcJob" run-at-startup="true" />
</beans>
and My main class is
package com.hadoop.basics;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] arguments) {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml", com.hadoop.basics.Main.class);
System.out.println("Job running");
}
}
and my Mapper and reducer classes are as follows
package com.hadoop.basics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer lineTokenizer = new StringTokenizer(line);
while (lineTokenizer.hasMoreTokens()) {
String cleaned = removeNonLettersOrNumbers(lineTokenizer.nextToken());
word.set(cleaned);
context.write(word, new IntWritable(1));
}
}
private String removeNonLettersOrNumbers(String original) {
return original.replaceAll("[^\\p{L}\\p{N}]", "");
}
}
package com.hadoop.basics;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected static final String TARGET_WORD = "Watson";
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
if (containsTargetWord(key)) {
int wordCount = 0;
for (IntWritable value: values) {
wordCount += value.get();
}
context.write(key, new IntWritable(wordCount));
}
}
private boolean containsTargetWord(Text key) {
return key.toString().equals(TARGET_WORD);
}
}
and i am getting the following exception
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@3b67a7: defining beans [org.springframework.context.support.PropertySourcesPlaceholderConfigurer#0,hadoopConfiguration,jobConf,wcJob,wcJobRunner]; root of factory hierarchy
Oct 31, 2013 5:03:15 PM org.springframework.data.hadoop.mapreduce.JobExecutor$2 run
INFO: Starting job [wcJob]
Oct 31, 2013 5:03:15 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
Oct 31, 2013 5:03:15 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
Oct 31, 2013 5:03:15 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Oct 31, 2013 5:03:15 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
Oct 31, 2013 5:03:15 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_201310301940_0007
Oct 31, 2013 5:03:17 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: map 0% reduce 0%
Oct 31, 2013 5:03:38 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Task Id : attempt_201310301940_0007_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.hadoop.basics.WordMapper
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:857)
at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:718)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: com.hadoop.basics.WordMapper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:810)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:855)
... 8 more
请帮帮我..