1

我正在寻找一个示例来对 AvroSpecificRecordBase对象进行类似于 a 的Bijection,GenericRecordBase或者是否有一种更简单的方法可以将该AvroSerializer类用作 Kafka 键和值序列化程序。

Injection<GenericRecord, byte[]> genericRecordInjection =
                                        GenericAvroCodecs.toBinary(schema);
byte[] bytes = genericRecordInjection.apply(type);
4

2 回答 2

1

https://github.com/miguno/kafka-storm-starter提供了这样的示例代码。

例如,参见AvroDecoderBolt。从它的javadocs:

此螺栓需要 Avro 编码的二进制格式的传入数据,并根据T. 它将传入的数据反序列化为一个Tpojo,并将这个 pojo 发送给下游消费者。因此,这个螺栓可以被认为是 Twitter Bijection 的Injection.invert[T, Array[Byte]](bytes)Avro 数据的 Storm 等价物。

在哪里

T: Avro 记录的类型(例如 a Tweet),基于正在使用的底层 Avro 模式。必须是 Avro 的子类SpecificRecordBase

代码的关键部分是(我将代码折叠到这个片段中):

// With T <: SpecificRecordBase

implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
  case Success(pojo) =>
    System.out.println("Binary data decoded into pojo: " + pojo)
  case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
于 2016-08-05T17:29:58.767 回答
0
Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(new File("/Users/.../schema.avsc"));
            Injection<Command, byte[]> objectInjection = SpecificAvroCodecs.toBinary(schema);
            byte[] bytes = objectInjection.apply(c);
于 2017-05-05T03:44:47.953 回答