0

情况

我目前正在使用 AVRO 和模式存储库编写消费者/生产者。

根据我收集的信息,我序列化这些数据的选项是使用 Confluent 的 avro 序列化程序,或者使用 Twitter 的 Bijection。

双射似乎是最直接的。

所以我想以以下格式生成日期ProducerRecord[String,Array[Byte]],这归结为 [some string ID, serialized GenericRecord]

(注意:我要使用通用记录,因为此代码库必须处理从 Json/csv/... 解析的数千个模式)

问题:

我序列化和使用 AVRO 的全部原因是你不需要在数据本身中有一个模式(就像你对 Json/XML/... 一样)。
然而,当检查主题中的数据时,我看到整个方案与数据一起包含。我做错了什么,这是设计使然,还是应该使用融合序列化程序?

代码:

  def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
    val converter = new JsonAvroConverter
    val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)

    genericRecord
  }
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
    //val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
    val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)

    r
  }

//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))


        producer.send(producerRecord, new Callback {...})
4

1 回答 1

2

如果您查看Confluent 源代码,您会看到与模式存储库交互的操作顺序是

  1. 从 Avro 记录中获取模式,并计算其 ID。理想情况下,将 Schema 发布到存储库,或者以其他方式散列它应该给你一个 ID。
  2. 分配一个字节缓冲区
  3. 将返回的 ID 写入缓冲区
  4. 将 Avro 对象值(不包括架构)作为字节写入缓冲区
  5. 将该字节缓冲区发送到 Kafka

目前,您的 Bijection 使用将在字节中包含架构,而不是用 ID 替换它

于 2018-08-01T13:49:38.557 回答