我想获取由 AVRO 反序列化器创建的记录并将其发送到 ElasticSearch。我意识到我必须编写自定义代码才能做到这一点。
使用 LITERAL 选项,我拥有 JSON 模式,这是使用 GenericRecord 的第一步。但是,纵观整个 AVRO Java API,我看不到将 GenericRecord 用于一条记录的方法。所有示例都使用 DataFileReader。
简而言之,我无法从 Flume 事件中获取字段。
有没有人这样做过?TIA。
我能够弄清楚。我做了以下事情:
// Get the schema
String strSchema = event.getHeader("flume.avro.schema.literal");
// Get the body
byte[] body = event.getBody();
// Create the avro schema
Schema schema = Schema.Parser.parse(strSchema);
// Get the decoder to use to get the "record" from the event stream in object form
BinaryDecoder decoder = DecoderFactory.binaryDecoder(body, null);
// Get the datum reader
GenericDatumReader reader = new GenericDatumReader(schema);
// Get the Avro record in object form
GenericRecord record = reader.read(null, decoder);
// Now you can iterate over the fields
for (Schema.Field field : schema.getFields()) {
Object value = record.get(field.name());
// Code to add field to JSON to send to ElasticSearch not listed
// ...
} // for (Schema.Field field : schema.getFields()) {
这很好用。