4

我遇到了简单的 spark 任务的问题,它读取 Avro 文件,然后将其保存为 Hive parquet 表。

我有 2 种类型的文件,通常它们是相同的,但关键结构有点不同 - 字段名称。

类型 1

root
|-- pk: strucnt (nullable = true)
    |-- term_id: string (nullale = true)

类型 2

root
|-- pk: strucnt (nullable = true)
    |-- id: string (nullale = true)

我正在使用 spark-avro 阅读 Avro。然后像这样将这个DF映射到bean

Dataset<SomeClass> df = avroDF.as(Encoders.bean(SomeClass.class));

SomeClass 是一个带有 getter 和 setter 的简单单字段类。

public class SomeClass{
    private String term_id;
    ...
}

因此,如果我正在阅读 Avro type 1 - 没关系。但是,如果我正在阅读 Avro 类型 2 - 就会发生错误。反之亦然,如果我将字段名称更改为private String id;

我的问题有什么通用的解决方案吗?我找到了@AvroName,但它不允许设置多个名称。谢谢。

4

2 回答 2

1

可能的解决方案是

StructType avroExtendedSchema = avroDF.schema().add("id",DataTypes.StringType);
avroDF.map(row->RowFactory(row.getStruct(0),row.getStruct(0).getString(0)), 
       RowEncoder.apply(avroExtendedSchema)).toDF();

所以 DF 的第二个字段将被命名为“id”并包含字符串键。将来可以删除第一个“pk”结构。

avroDF.drop("pk");

PS我找到了第三种模式:

root
|-- pk: strucnt (nullable = true)
    |-- id: int(nullale = true)

所以最终的代码是这样的:

DataType keyType = avroDF.select("pk.*").schema().fields[0].dataType();
StructType avroExtendedSchema = avroDF.schema().add("id",keyType);
avroDF.map(row->RowFactory(row.getStruct(0),row.getStruct(0).get(0)), 
       RowEncoder.apply(avroExtendedSchema)).drop("pk").toDF();

此代码适用于任何原语\String 键。

于 2018-01-29T13:16:16.047 回答
1

只有一种方法是将数据集字段名更改为模式中的名称。使用此示例执行此操作:

val newName = Seq("id", "x1", "x2", "x3")
Dataset<SomeClass> df = avroDF.toDF(newNames: _*).as(Encoders.bean(SomeClass.class));

您不能将数据框转换为具有不同字段名称的 BeanClass。

于 2018-01-29T08:55:46.083 回答