0

我有avro消息和.avsc文件。我已经从.avsc文件生成了 java 类。现在我想将 avro(json) 消息转换为数据帧。我读了消息。成功解码了消息,我得到了 RDD[Product] 但我无法将 RDD[Product] 转换为数据帧。我需要将消息保存为 .avro 格式。

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._
val rdd = spark.read.textFile("/Users/lucy/product_avro.json").rdd

val rdd1 = rdd.map(string => toProduct(string))
spark.createDataFrame(rdd1, classOf[Product]) // not working

}

def toProduct(input: String): Product = {
    return new SpecificDatumReader[Product](Product.SCHEMA$)
      .read(null, DecoderFactory.get().jsonDecoder(Product.SCHEMA$, input))

  }

错误:java.lang.UnsupportedOperationException:在 bean 类中不能有循环引用,但是得到了类 org.apache.avro.Schema 的循环引用

4

0 回答 0