https://github.com/miguno/kafka-storm-starter提供了这样的示例代码。
例如,参见AvroDecoderBolt。从它的javadocs:
此螺栓需要 Avro 编码的二进制格式的传入数据,并根据T
. 它将传入的数据反序列化为一个T
pojo,并将这个 pojo 发送给下游消费者。因此,这个螺栓可以被认为是 Twitter Bijection 的Injection.invert[T, Array[Byte]](bytes)
Avro 数据的 Storm 等价物。
在哪里
T
: Avro 记录的类型(例如 a Tweet
),基于正在使用的底层 Avro 模式。必须是 Avro 的子类SpecificRecordBase
。
代码的关键部分是(我将代码折叠到这个片段中):
// With T <: SpecificRecordBase
implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
case Success(pojo) =>
System.out.println("Binary data decoded into pojo: " + pojo)
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}