1

我有以下 avro 架构

{
    "type":"record",
    "name":"test",
    "namespace":"test.name",
    "fields":[
        {"name":"items","type":
            {"type":"array",
                "items":
                    {"type":"record","name":"items",
                        "fields":[
                                {"name":"name","type":"string"},
                                {"name":"state","type":"string"}
                            ]
                    }
            }
        },
        {"name":"firstname","type":"string"}
    ]
}

当我使用 Json 解码器和 avro 编码器对 Json 数据进行编码时:

val writer = new GenericDatumWriter[GenericRecord](schema)
val reader = new GenericDatumReader[GenericRecord](schema)
val baos = new ByteArrayOutputStream
val decoder: JsonDecoder = DecoderFactory.get.jsonDecoder(schema, json)
val encoder = EncoderFactory.get.binaryEncoder(baos, null)
val datum = reader.read(null, decoder)
writer.write(datum, encoder)
encoder.flush()
val avroByteArray = baos.toByteArray

方案 1: 当我传递以下 json 进行编码时,它工作正常:

{
  "items": [
    {
      "name": "dallas",
      "state": "TX"
    }
  ],
  "firstname":"arun"
}

场景2: 当我在根级别(姓氏)的json中传递附加属性时,它能够编码并且工作正常:

{
  "items": [
    {
      "name": "dallas",
      "state": "TX"
    }
  ],
  "firstname":"fname",
  "lastname":"lname"
}

场景3: 当我在数组记录(国家)中添加附加属性时,它会引发以下异常:

Expected record-end. Got FIELD_NAME
org.apache.avro.AvroTypeException: Expected record-end. Got FIELD_NAME
    at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:698)
{
  "items": [
    {
      "name": "dallas",
      "state": "TX",
      "country":"USA"
    }
  ],
  "firstname":"fname",
  "lastname":"lname"
}

我需要让场景#3 工作,任何帮助都会很棒。

4

2 回答 2

0

使用 spark 数据框方法用各自的 avro 模式格式隐藏 json 数据会有所帮助。

  1. 使用 SchemaConverters从 avro 模式创建结构类型
  2. 从结构类型和 json rdd 字符串步骤创建数据框
  3. 使用df.toJSON将数据框行转换为 json

示例测试用例:

import java.io.ByteArrayOutputStream

import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic._
import org.apache.avro.io._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.scalatest.{Matchers, WordSpecLike}

class Test extends WordSpecLike
  with Matchers {

  val schemaString: String =
    """{
      |    "type":"record",
      |    "name":"test",
      |    "namespace":"test.name",
      |    "fields":[
      |        {"name":"items","type":
      |            {"type":"array",
      |                "items":
      |                    {"type":"record","name":"items",
      |                        "fields":[
      |                                {"name":"name","type":"string"},
      |                                {"name":"state","type":"string"}
      |                            ]
      |                    }
      |            }
      |        },
      |        {"name":"firstname","type":"string"}
      |    ]
      |}""".stripMargin

  // create spark session and sql context
  val builder: Builder = SparkSession.builder.appName("testAvroSpark")
  val sparkSession: SparkSession = builder.master("local[1]").getOrCreate()
  val sc: SparkContext = sparkSession.sparkContext
  val sqlContext: SQLContext = sparkSession.sqlContext

  // avro schema from json type schema string
  val schema: Schema = new Parser().parse(schemaString)

  // get spark struct type from avro schema
  val requiredType: StructType =
    SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

  "scenario one json data with given schema" in {
    val scenarioOneJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX"
        |    }
        |  ],
        |  "firstname":"rumesh"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioOneJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  "scenario two json data with given schema" in {
    val scenarioTwoJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX"
        |    }
        |  ],
        |  "firstname":"rumesh",
        |  "lastname":"krish"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioTwoJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  "scenario three json data with given schema" in {
    val scenarioThreeJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX",
        |      "country":"USA"
        |    }
        |  ],
        |  "firstname":"rumesh",
        |  "lastname":"krish"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioThreeJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  /**
    * convert the json using data frame json parser with given schema struct type
    *
    * @param customType   given data frame struct type
    * @param jsonInputRdd json rdd string
    * @return
    */
  private def customJsonConverter(customType: StructType,
                                  jsonInputRdd: RDD[String]): List[String] = {
    // create data frame from rdd string with struct type schema
    val df: DataFrame = sqlContext.read.schema(customType).json(jsonInputRdd)

    // get the list of json string data frame
    df.toJSON.rdd.toLocalIterator.toList
  }


  /**
    * avro binary serialization
    *
    * @param avroSchema avro schema
    * @param jsonData   json data
    * @return
    */
  private def binaryEncoder(avroSchema: Schema, jsonData: String): Array[Byte] = {
    val writer = new GenericDatumWriter[GenericRecord](avroSchema)
    val reader = new GenericDatumReader[GenericRecord](avroSchema)
    val baos = new ByteArrayOutputStream
    val decoder: JsonDecoder = DecoderFactory.get.jsonDecoder(avroSchema, jsonData)
    val encoder = EncoderFactory.get.binaryEncoder(baos, null)
    val datum = reader.read(null, decoder)
    writer.write(datum, encoder)
    encoder.flush()
    baos.toByteArray
  }

}
于 2018-02-10T05:27:47.570 回答
0

您的架构不代表场景 3 中的结构:缺少“国家”字段:

{"name":"country", "type":"string"}

您只是在声明“名称”和“状态”字段。然后解码器正确地期望(子)记录在这些之后结束,并且正如错误消息所述,它得到一个(另一个)字段名称而不是('国家')。

顺便说一句:您可以使用生成器始终从 JSON 中获取匹配的模式,网络中有几个可用的。

于 2018-07-24T09:38:02.767 回答