2

我编写了一个小型 hadoop 地图程序来解析(正则表达式)来自其他应用程序生成的日志文件的信息。我发现这篇文章http://www.nearinfinity.com//blogs/stephen_mouring_jr/2013/01/04/writing-hive-tables-from-mapreduce.html 这篇文章解释了如何解析并写入hive表

这是我的代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
    public class ParseDataToDB {
    public static final String SEPARATOR_FIELD = new String(new char[] {1});
    public static final String SEPARATOR_ARRAY_VALUE = new String(new char[] {2});
    public static final BytesWritable NULL_KEY =  new BytesWritable();

    public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, Text>  {
        //private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private ArrayList<String> bazValues = new ArrayList<String>();

        public void map(LongWritable key, Text value,
                OutputCollector<BytesWritable, Text> context)
                throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while(tokenizer.hasMoreTokens()){
                word.set(tokenizer.nextToken());
                if(word.find("extract") > -1) {
                    System.out.println("in herer");
                    bazValues.add(line);
                }
            }
            // Build up the array values as a delimited string.
            StringBuilder bazValueBuilder = new StringBuilder();
            int i = 0;
            for (String bazValue : bazValues) {
                bazValueBuilder.append(bazValue);
                ++i;
                if (i < bazValues.size()) {
                    bazValueBuilder.append(SEPARATOR_ARRAY_VALUE);
                }
            }

            // Build up the column values / fields as a delimited string.
            String hiveRow = new String();
            hiveRow += "fooValue";
            hiveRow += SEPARATOR_FIELD;
            hiveRow += "barValue";
            hiveRow += SEPARATOR_FIELD;
            hiveRow += bazValueBuilder.toString();
            System.out.println("in herer hiveRow" + hiveRow);

//          StringBuilder hiveRow = new StringBuilder();
//          hiveRow.append("fooValue");
//          hiveRow.append(SEPARATOR_FIELD);
//          hiveRow.append("barValue");
//          hiveRow.append(SEPARATOR_FIELD);
//          hiveRow.append(bazValueBuilder.toString());

            // Emit a null key and a Text object containing the delimited fields
            context.collect(NULL_KEY, new Text(hiveRow));           
        }
    } 


    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();       
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "MyTest");
        job.setJarByClass(ParseDataToDB.class);
        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(BytesWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(BytesWritable.class);
        job.setOutputValueClass(Text.class);


        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

但是当我运行这个应用程序时,我收到一条错误消息“预期 ByteWritable 但收到 LongWritable。有人能告诉我我做错了什么吗?我是 hadoop 编程的新手。我也愿意创建外部表并将其指向 hdfs,我再次挣扎与实施。谢谢。

4

2 回答 2

0

我认为当您尝试从地图中输出 NULL 作为键时,您可以使用 NullWritable。所以你的代码将如下所示: -

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ParseDataToDB {
public static final String SEPARATOR_FIELD = new String(new char[] {1});
public static final String SEPARATOR_ARRAY_VALUE = new String(new char[] {2});


public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text>  {
    //private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private ArrayList<String> bazValues = new ArrayList<String>();

    public void map(LongWritable key, Text value,
            OutputCollector<NullWritable, Text> context)
            throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while(tokenizer.hasMoreTokens()){
            word.set(tokenizer.nextToken());
            if(word.find("extract") > -1) {
                System.out.println("in herer");
                bazValues.add(line);
            }
        }
        // Build up the array values as a delimited string.
        StringBuilder bazValueBuilder = new StringBuilder();
        int i = 0;
        for (String bazValue : bazValues) {
            bazValueBuilder.append(bazValue);
            ++i;
            if (i < bazValues.size()) {
                bazValueBuilder.append(SEPARATOR_ARRAY_VALUE);
            }
        }

        // Build up the column values / fields as a delimited string.
        String hiveRow = new String();
        hiveRow += "fooValue";
        hiveRow += SEPARATOR_FIELD;
        hiveRow += "barValue";
        hiveRow += SEPARATOR_FIELD;
        hiveRow += bazValueBuilder.toString();
        System.out.println("in herer hiveRow" + hiveRow);



        // Emit a null key and a Text object containing the delimited fields
        context.collect(NullWritable.get(), new Text(hiveRow));           
    }
} 


public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();       
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    Job job = new Job(conf, "MyTest");
    job.setJarByClass(ParseDataToDB.class);
    job.setMapperClass(MyMapper.class);

    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);


    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

于 2013-10-10T08:42:23.363 回答
0

通过查看您提供的文章LINKNULL_KEY表明您没有设置任何值。它应该是

public static final BytesWritable NULL_KEY = new BytesWritable(null);
于 2013-06-14T12:20:27.167 回答