4

我有一个ByteArrayInputStream被序列化的,List<TestAvroModel>它是SpecificRecord的一个实现。我找不到让 Avro 知道被序列化的列表的方法,所以我做了一个骇人听闻的方法来遍历ByteArrayInputStream.


//TestAvroModel is an implementation of SpecificRecord
List<TestAvroModel> models;
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
for(TestAvroModel model: models) {
    DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(model.getSchema());
    Encoder encoder = new EncoderFactory().binaryEncoder(byteArrayStream, null);

    writer.write(model, encoder);
    encoder.flush();
}

//This was pre-serialized with a List of TestAvroModel
ByteArrayInputStream inputStream;

DatumReader<TestAvroModel> reader = new SpecificDatumReader<>(TestAvroModel.getClassSchema());
Decoder decoder = DecoderFactory().get().binaryDecoder(inputStream, null);

List<TestAvroModel> records = new ArrayList<>();
boolean eof = false;
while(!eof) {
    try {
        records.add(reader.read(null, decoder));
    catch(EOFException ex) {
        eof = true;
    }
}

这种方式工作并一次读取序列化List<TestAvroModel>的一个并将其添加到我的记录列表中。虽然循环DatumReader直到 anEOFException似乎不是最好的方法,但我还没有找到更好的方法。

我在 Avro 库中找不到任何处理其中InputStream包含多个 Avro 记录的内容。尽管 Avro 必须在流中具有断点才能像我上面那样读取单个记录。重申一下,有没有人知道一种更好的方法来循环遍历DatumReader上面显示的方法?

4

1 回答 1

0

Decoder似乎isEnd(): Boolean为此目的定义:

如果当前 BinaryDecoder 位于其源数据的末尾并且在不引发 EOFException 或其他 IOException 的情况下无法进一步读取,则返回 true。

这应该有效:

...
while(!decoder.isEnd()) {
  records.add(reader.read(null, decoder));
}
于 2020-11-10T12:58:57.437 回答