0

我在 S3 中有一些遗留数据,我想使用 Spark 2 使用 Java API 将其转换为镶木地板格式。

我有所需的 Avro 模式(.avsc 文件)和它们使用 Avro 编译器生成的 Java 类,我想使用 Parquet 格式的这些模式存储数据。输入数据不是任何标准格式,但我有一个库,可以将旧文件中的每一行转换为 Avro 类。

是否可以将数据读取为JavaRDD<String>,使用库将转换应用于 Avro 类,最后以镶木地板格式存储。

就像是:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");    
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));    
converted.saveAsParquet("s3://bucket/destination"); //how do I do this

像上面这样的事情可行吗?稍后我想使用 Hive、Presto 和 Spark 处理转换后的 parquet 数据。

4

2 回答 2

1

暂时忽略 S3;这是一个生产细节。您需要从更简单的问题开始“将我格式的本地文件转换为标准文件”。这是您可以通过针对一小部分数据样本集的单元测试在本地实现的。

这在 Spark 中通常与 Hadoop Mapreduce 相同:实现 or 的子类,InputFormat<K, V>或者FileInputFormat<K, V>使用 Hadoop 的org.apache.hadoop.streaming.mapreduce.StreamInputFormat输入格式,实现您自己的 RecordReader,然后将选项设置spark.hadoop.stream.recordreader.class为您的记录阅读器的类名(可能是最简单的)。

有很多关于此的文档,以及堆栈溢出问题。源代码树本身中有很多示例。

于 2017-01-23T11:20:45.840 回答
0

想通了,基本上是史蒂夫提到的方法,除了 Hadoop 输入和输出格式已经存在:

         Job job = new Job();
         ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
         AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$);
         AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024);
         AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
         AvroParquetOutputFormat.setCompressOutput(job, true);

         sparkContext.textFile("s3://bucket/path_to_legacy_files")
            .map(line -> customLib.convertToAvro(line))
            .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record))
            .saveAsNewAPIHadoopFile(
                "s3://bucket/destination", 
                Void.class, 
                MyAvroType.class,
                new ParquetOutputFormat<MyAvroType>().getClass(), 
                job.getConfiguration());
于 2017-01-23T15:55:47.393 回答