0

我正在尝试使用 map-reduce 作业连接到 mysql 数据库的以下代码。我面临下面发布的以下错误。我在我的代码中放置了检查点,表明在作业实际运行之前的部分作业运行正确,之后作业失败......

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
//import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;


public class TweetWordCount {


    public static class TweetWordCountMapper extends MapReduceBase implements
            Mapper<LongWritable, GetTweets, Text, IntWritable> {
        private final static IntWritable intTwordsCount = new IntWritable(1);
        private Text strTwoken = new Text();

        public void map(LongWritable key, GetTweets value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            System.out.println("checkpoint4");
            GetTweets tweets = new GetTweets();
            tweets.strTweet = value.strTweet;
            //TwitterTokenizer twokenizer = new TwitterTokenizer();
            //List<String> twokens = twokenizer.twokenize(value.strTweet.toString());

            output.collect(new Text(value.strTweet.toString()), intTwordsCount);
            System.out.println("checkpoint5");

        }

    }


    public static class TweetWordCountReducer extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            System.out.println("checkpoint6");
            int intTwokenCount = 0;
            while (values.hasNext()) {
                intTwokenCount += values.next().get();
            }
            output.collect(key, new IntWritable(intTwokenCount));
            System.out.println("checkpoint6");
        }
    }


    public static void main(String[] args) throws Exception {

        System.out.println("checkpoint1");
          JobConf twokenJobConf = new JobConf(new Configuration(),TweetWordCount.class);
          //JobConf twokenJobConf = new JobConf(TweetWordCount.class);
          twokenJobConf.setJobName("twoken_count");

          twokenJobConf.setInputFormat(DBInputFormat.class); //Set input format here
          twokenJobConf.setOutputFormat(TextOutputFormat.class);// Sets the output format

          Object out = new Path("twokens");

          twokenJobConf.setMapperClass(TweetWordCountMapper.class);
          twokenJobConf.setCombinerClass(TweetWordCountReducer.class);
          twokenJobConf.setReducerClass(TweetWordCountReducer.class);

          twokenJobConf.setOutputKeyClass(Text.class);
          twokenJobConf.setOutputValueClass(IntWritable.class);

          DBConfiguration.configureDB(twokenJobConf, "com.mysql.jdbc.Driver",
                  "jdbc:mysql://localhost/test", "root", "root"); //Specifies the DB configuration

          String[] fields = {"Tweet"}; //Specifies the Fields to be fetched from DB
          DBInputFormat.setInput(twokenJobConf, GetTweets.class, "NewGamil",
                  null /* conditions */, "Tweet", fields); // Specifies the DB table and fields

          //SequenceFileOutputFormat.setOutputPath(twokenJobConf, (Path) out);
          FileOutputFormat.setOutputPath(twokenJobConf, (Path) out);
          System.out.println("checkpoint2");
          JobClient.runJob(twokenJobConf);
          System.out.println("checkpoint3");

    }


    public static class GetTweets implements Writable, DBWritable {
        String strTweet;

        public GetTweets() {

        }

        public void readFields(DataInput in) throws IOException {
            System.out.println("checkpoint 2a");
            this.strTweet = Text.readString(in);
        }

        public void readFields(ResultSet resultSet) throws SQLException {
            System.out.println("checkpoint 3a");
            // this.id = resultSet.getLong(1);
            this.strTweet = resultSet.getString(1);
        }

        public void write(DataOutput out) throws IOException {

        }

        public void write(PreparedStatement stmt) throws SQLException {

        }

    }


}


rv@ramanujan:~$ hadoop jar Twit.jar 
Warning: $HADOOP_HOME is deprecated.

checkpoint1
checkpoint2
13/03/22 17:16:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/03/22 17:16:12 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/home/rv/hadoopfiles/mapred/staging/rv/.staging/job_201303221600_0008
Exception in thread "main" java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
    at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:981)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:973)
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:889)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:842)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:842)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:816)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1253)
    at TweetWordCount.main(TweetWordCount.java:107)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
    ... 20 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
    at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:271)
    ... 25 more
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
    at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:188)
    at org.apache.hadoop.mapred.lib.db.DBConfiguration.getConnection(DBConfiguration.java:123)
    at org.apache.hadoop.mapred.lib.db.DBInputFormat.configure(DBInputFormat.java:266)
    ... 25 more
4

2 回答 2

2

确保在命令中使用 -libjars。如下所示: bin/hadoop jar wordcount.jar org.myorg.DBCountPageView -libjars mysql-connector-java-5.1.26-bin.jar

请参考http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html的使用部分

于 2014-09-05T20:09:12.397 回答
1

不确定您的应用程序的范围是什么(学习、开发等),但我建议使用Sqoop与 MySQL 等关系数据库进行交互。

于 2013-03-22T13:39:32.970 回答