0

我需要将以下转换为 Java 中的 Spark DataFrame,并根据 avro 模式保存结构。然后我将基于这个avro结构将它写入s3。

GenericRecord r = new GenericData.Record(inAvroSchema);
r.put("id", "1");
r.put("cnt", 111);

Schema enumTest =
        SchemaBuilder.enumeration("name1")
                .namespace("com.name")
                .symbols("s1", "s2");

GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumTest, "s1");

r.put("type", symbol);

ByteArrayOutputStream bao = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> w = new GenericDatumWriter<>(inAvroSchema);

Encoder e = EncoderFactory.get().jsonEncoder(inAvroSchema, bao);
w.write(r, e);
e.flush();

我可以基于 JSON 结构创建对象

  Object o = reader.read(null, DecoderFactory.get().jsonDecoder(inAvroSchema, new ByteArrayInputStream(bao.toByteArray())));

但也许有什么方法可以基于 ByteArrayInputStream(bao.toByteArray()) 创建 DataFrame?

谢谢

4

1 回答 1

0

不,您必须使用数据源来读取 Avro 数据。Spark 将 Avro 作为文件从文件系统读取是至关重要的,因为许多优化和功能都依赖于它(例如压缩和分区)。您必须添加spark-avro(除非您在 2.4 以上)。请注意,EnumType您使用的将String在 Spark 的Dataset

另请参阅:Spark:读取 inputStream 而不是 File

或者,您可以考虑部署一堆任务并通过/SparkContext#parallelize显式读取/写入文件。DatumReaderDatumWriter

于 2020-07-02T04:37:17.263 回答