1

我正在使用 hadoop-1.2.1 并尝试使用 ToolRunner 运行一个简单的 RowCount HBase 作业。但是,无论我尝试什么,hadoop 都找不到地图类。jar 文件被正确复制到 hdfs 中,但我似乎无法弄清楚哪里出了问题。请帮忙!

这是代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;


import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class HBaseRowCountToolRunnerTest extends Configured implements Tool
{

    // What to copy.
    public static final String JAR_NAME = "myJar.jar";
    public static final String LOCAL_JAR = <path_to_jar> + JAR_NAME;
    public static final String REMOTE_JAR = "/tmp/"+JAR_NAME;


    public static void main(String[] args) throws Exception 
    {
        Configuration config = HBaseConfiguration.create();

//All connection configs set here -- omitted to post the code 

        config.set("tmpjars", REMOTE_JAR);


        FileSystem dfs = FileSystem.get(config);

        System.out.println("pathString = " + (new Path(LOCAL_JAR)).toString() + " \n");

        // Copy jar file to remote.
        dfs.copyFromLocalFile(new Path(LOCAL_JAR), new Path(REMOTE_JAR));

        // Get rid of jar file when we're done.
        dfs.deleteOnExit(new Path(REMOTE_JAR));

        // Run the job.
        System.exit(ToolRunner.run(config, new HBaseRowCountToolRunnerTest(), args));
    }

    @Override
    public int run(String[] args) throws Exception 
    {
        Job job = new RowCountJob(getConf(), "testJob", "myLittleHBaseTable");

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class RowCountJob extends Job
    {

        RowCountJob(Configuration conf, String jobName, String tableName) throws IOException
        {
            super(conf, RowCountJob.class.getCanonicalName() + "_" + jobName);

            setJarByClass(getClass()); 

            Scan scan = new Scan();
            scan.setCacheBlocks(false);
            scan.setFilter(new FirstKeyOnlyFilter());

            setOutputFormatClass(NullOutputFormat.class);

            TableMapReduceUtil.initTableMapperJob(tableName, scan,
                    RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, this);

            setNumReduceTasks(0);

        }

    }//end public static class RowCountJob extends Job

    //Mapper that runs the count
    //TableMapper -- TableMapper<KEYOUT, VALUEOUT> (*OUT by type)
    public static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> 
    {

        //Counter enumeration to count the actual rows
        public static enum Counters {ROWS}

        /**
         * Maps the data.
         *
         * @param row  The current table row key.
         * @param values  The columns.
         * @param context  The current context.
         * @throws IOException When something is broken with the data.
         * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
         *   org.apache.hadoop.mapreduce.Mapper.Context)
         */
        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException 
        {
            // Count every row containing data times 2, whether it's in qualifiers or values
            context.getCounter(Counters.ROWS).increment(2);
        }

    }//end public static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> 


}//end public static void main(String[] args) throws Exception
4

2 回答 2

1

好的-我找到了解决该问题的方法,并认为我会与所有其他有类似问题的人分享...

事实证明,我放弃了 tmpjars 配置选项,只是将 jar 文件从代码本身复制到了 DistributedCache 中。这是它的样子:

// Copy jar file to remote.
FileSystem dfs = FileSystem.get(conf);
dfs.copyFromLocalFile(new Path(LOCAL_JAR), new Path(REMOTE_JAR));

// Get rid of jar file when we're done.
dfs.deleteOnExit(new Path(REMOTE_JAR));

//Place it in the distributed cache
DistributedCache.addFileToClassPath(new Path(REMOTE_JAR), conf, dfs);

也许它不能解决 tmpjars 的问题,但它确实有效。

于 2013-10-01T13:35:30.230 回答
0

今天也遇到了同样的问题,最后发现是因为忘记在驱动类中插入下面这句话了...

job.setJarByClass(HBaseTestDriver.class);
于 2017-08-26T15:45:28.880 回答