2

我想获取由 AVRO 反序列化器创建的记录并将其发送到 ElasticSearch。我意识到我必须编写自定义代码才能做到这一点。

使用 LITERAL 选项,我拥有 JSON 模式,这是使用 GenericRecord 的第一步。但是,纵观整个 AVRO Java API,我看不到将 GenericRecord 用于一条记录的方法。所有示例都使用 DataFileReader。

简而言之,我无法从 Flume 事件中获取字段。

有没有人这样做过?TIA。

4

1 回答 1

2

我能够弄清楚。我做了以下事情:

// Get the schema
String strSchema = event.getHeader("flume.avro.schema.literal");
// Get the body
byte[] body = event.getBody();

// Create the avro schema
Schema schema = Schema.Parser.parse(strSchema);

// Get the decoder to use to get the "record" from the event stream in object form
BinaryDecoder decoder = DecoderFactory.binaryDecoder(body, null); 

// Get the datum reader
GenericDatumReader reader = new GenericDatumReader(schema);

// Get the Avro record in object form
GenericRecord record = reader.read(null, decoder);

// Now you can iterate over the fields
for (Schema.Field field : schema.getFields()) {
   Object value = record.get(field.name());

   // Code to add field to JSON to send to ElasticSearch not listed
   // ...

} // for (Schema.Field field : schema.getFields()) {

这很好用。

于 2014-06-12T00:23:47.207 回答