5

更新:spark-avro 包已更新以支持这种情况。https://github.com/databricks/spark-avro/releases/tag/v3.1.0

我有一个由我无法控制的第三方创建的 AVRO 文件,我需要使用 spark 处理它。AVRO 模式是其中一个字段是混合联合类型的记录:

{    
    "name" : "Properties",                              
    "type" : {                                          
    "type" : "map",                                   
    "values" : [ "long", "double", "string", "bytes" ]
}                                                   

spark-avro阅读器不支持此功能:

除了上面列出的类型,它还支持读取三种联合类型: union(int, long) union(float, double) union(something, null),其中something是上面列出的支持的Avro类型之一或者是支持的联合类型之一。

阅读有关 AVRO 的架构演变和解决方案,我希望能够通过指定省略此字段的不同读取器架构来读取文件,同时跳过有问题的字段。根据AVRO Schema Resolution docs,它应该可以工作:

如果作者的记录包含一个名称不存在于读者记录中的字段,则忽略该字段的作者值。

所以我修改使用

 val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)

avroSchema作者使用的完全相同的架构在哪里,但没有有问题的字段。

但是关于混合联合类型,我仍然遇到同样的错误。

AVRO 是否支持这种模式演变场景?与 avro 火花?还有其他方法可以实现我的目标吗?


更新:我已经使用 Apache Avro 1.8.1 测试了相同的场景(实际上是相同的文件)并且它按预期工作。那么它必须专门与spark-avro. 有任何想法吗?

4

1 回答 1

5

更新:spark-avro 包已更新以支持这种情况。https://github.com/databricks/spark-avro/releases/tag/v3.1.0

这实际上并没有回答我的问题,而是针对同一问题的不同解决方案。

由于目前 spark-avro 没有此功能(请参阅我对问题的评论) - 我使用了 avro 的org.apache.avro.mapreduce和 spark 的newAPIHadoopFile。这是一个简单的例子:

val path = "..."
val conf = new SparkConf().setAppName("avro test")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
val sc = new SparkContext(conf)

val avroRdd = sc.newAPIHadoopFile(path,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable])

与 spark-avro 不同的是,官方的 avro 库支持混合联合类型和模式演变。

于 2016-11-13T12:36:19.747 回答