情况
我目前正在使用 AVRO 和模式存储库编写消费者/生产者。
根据我收集的信息,我序列化这些数据的选项是使用 Confluent 的 avro 序列化程序,或者使用 Twitter 的 Bijection。
双射似乎是最直接的。
所以我想以以下格式生成日期ProducerRecord[String,Array[Byte]]
,这归结为 [some string ID, serialized GenericRecord]
(注意:我要使用通用记录,因为此代码库必须处理从 Json/csv/... 解析的数千个模式)
问题:
我序列化和使用 AVRO 的全部原因是你不需要在数据本身中有一个模式(就像你对 Json/XML/... 一样)。
然而,当检查主题中的数据时,我看到整个方案与数据一起包含。我做错了什么,这是设计使然,还是应该使用融合序列化程序?
代码:
def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
val converter = new JsonAvroConverter
val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)
genericRecord
}
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
//val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)
r
}
//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))
producer.send(producerRecord, new Callback {...})