0

我有一个使用 OOZIE 工作流 xml 并以序列文件格式(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat)写入输出的 Map Reduce 作业,是否也有类似的东西可以保存为 Parquet 格式?,我不能在https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/lib/output/package-summary.html下找到任何

还是我应该使用不同的方法?

请指教 。

谢谢

映射器:

    import org.apache.avro.Schema;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;


public class TestMapper1 extends Mapper<LongWritable, Text, Text, IntWritable>{




    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        StringTokenizer str = new StringTokenizer(value.toString());

        while (str.hasMoreTokens()) {
            String word = str.nextToken();

            context.write(new Text(word), new IntWritable(1));
        }
    }

}

减速器:

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public  class TestReducer1 extends Reducer<Text, IntWritable, Void, GenericRecord> {

    private static final Schema MAPPING_SCHEMA = new Schema.Parser().parse(
            "{\n" +
                    "    \"type\":    \"record\",\n" +
                    "    \"name\":    \"TextFile\",\n" +
                    "    \"doc\":    \"Text File\",\n" +
                    "    \"fields\":\n" +
                    "    [\n" +
                    "            {\"name\":    \"col1\", \"type\":    \"string\"}\n"+
                    "    ]\n"+
                    "}\n");





    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
         GenericRecord record = new GenericData.Record(MAPPING_SCHEMA);

        int sum = 0;
        for (IntWritable i : values) {
            sum += i.get();
        }
        record.put("col1",String.valueOf(sum));
        context.write(null, record);
    }
}

司机:

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.avro.AvroParquetOutputFormat;


public class Driver1 {


    public static void main(String[] args) throws Exception {


         Schema MAPPING_SCHEMA = new Schema.Parser().parse(
                "{\n" +
                        "    \"type\":    \"record\",\n" +
                        "    \"name\":    \"TextFile\",\n" +
                        "    \"doc\":    \"Text File\",\n" +
                        "    \"fields\":\n" +
                        "    [\n" +
                        "            {\"name\":    \"col1\", \"type\":    \"string\"}\n"+
                        "    ]\n"+
                        "}\n");



        if (args.length != 2) {
            System.err.println("Usage: WordCount <InPath> <OutPath>");
            System.exit(2);
        }
        Configuration conf = new Configuration();
        //
        Job job = Job.getInstance(conf, "ParquetConvert WordCount");
        job.setJarByClass(TestMapper1.class);
        job.setMapperClass(TestMapper1.class);
        job.setReducerClass(TestReducer1.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        // setting schema
        AvroParquetOutputFormat.setSchema(job, MAPPING_SCHEMA);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));


        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }



}
4

1 回答 1

0

Oozie 不应控制 MapReduce 作业中包含哪些库。

ParquetOutputFormat没有内置在 Hadoop 中。

你可以在这里找到它

Maven 目标

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>${parquet.version}</version>
</dependency>
于 2021-08-05T15:30:47.807 回答