我正在尝试使用 map-reduce 作业连接到 mysql 数据库的以下代码。我面临下面发布的以下错误。我在我的代码中放置了检查点,表明在作业实际运行之前的部分作业运行正确,之后作业失败......
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
//import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class TweetWordCount {
public static class TweetWordCountMapper extends MapReduceBase implements
Mapper<LongWritable, GetTweets, Text, IntWritable> {
private final static IntWritable intTwordsCount = new IntWritable(1);
private Text strTwoken = new Text();
public void map(LongWritable key, GetTweets value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
System.out.println("checkpoint4");
GetTweets tweets = new GetTweets();
tweets.strTweet = value.strTweet;
//TwitterTokenizer twokenizer = new TwitterTokenizer();
//List<String> twokens = twokenizer.twokenize(value.strTweet.toString());
output.collect(new Text(value.strTweet.toString()), intTwordsCount);
System.out.println("checkpoint5");
}
}
public static class TweetWordCountReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
System.out.println("checkpoint6");
int intTwokenCount = 0;
while (values.hasNext()) {
intTwokenCount += values.next().get();
}
output.collect(key, new IntWritable(intTwokenCount));
System.out.println("checkpoint6");
}
}
public static void main(String[] args) throws Exception {
System.out.println("checkpoint1");
JobConf twokenJobConf = new JobConf(new Configuration(),TweetWordCount.class);
//JobConf twokenJobConf = new JobConf(TweetWordCount.class);
twokenJobConf.setJobName("twoken_count");
twokenJobConf.setInputFormat(DBInputFormat.class); //Set input format here
twokenJobConf.setOutputFormat(TextOutputFormat.class);// Sets the output format
Object out = new Path("twokens");
twokenJobConf.setMapperClass(TweetWordCountMapper.class);
twokenJobConf.setCombinerClass(TweetWordCountReducer.class);
twokenJobConf.setReducerClass(TweetWordCountReducer.class);
twokenJobConf.setOutputKeyClass(Text.class);
twokenJobConf.setOutputValueClass(IntWritable.class);
DBConfiguration.configureDB(twokenJobConf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/test", "root", "root"); //Specifies the DB configuration
String[] fields = {"Tweet"}; //Specifies the Fields to be fetched from DB
DBInputFormat.setInput(twokenJobConf, GetTweets.class, "NewGamil",
null /* conditions */, "Tweet", fields); // Specifies the DB table and fields
//SequenceFileOutputFormat.setOutputPath(twokenJobConf, (Path) out);
FileOutputFormat.setOutputPath(twokenJobConf, (Path) out);
System.out.println("checkpoint2");
JobClient.runJob(twokenJobConf);
System.out.println("checkpoint3");
}
public static class GetTweets implements Writable, DBWritable {
String strTweet;
public GetTweets() {
}
public void readFields(DataInput in) throws IOException {
System.out.println("checkpoint 2a");
this.strTweet = Text.readString(in);
}
public void readFields(ResultSet resultSet) throws SQLException {
System.out.println("checkpoint 3a");
// this.id = resultSet.getLong(1);
this.strTweet = resultSet.getString(1);
}
public void write(DataOutput out) throws IOException {
}
public void write(PreparedStatement stmt) throws SQLException {
}
}
}
rv@ramanujan:~$ hadoop jar Twit.jar
Warning: $HADOOP_HOME is deprecated.
checkpoint1
checkpoint2
13/03/22 17:16:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/03/22 17:16:12 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/home/rv/hadoopfiles/mapred/staging/rv/.staging/job_201303221600_0008
Exception in thread "main" java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:981)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:973)
at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:889)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:842)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:842)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:816)
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1253)
at TweetWordCount.main(TweetWordCount.java:107)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 20 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:271)
... 25 more
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:188)
at org.apache.hadoop.mapred.lib.db.DBConfiguration.getConnection(DBConfiguration.java:123)
at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:266)
... 25 more