我可以使用 Spark 流式传输方法成功解析放入目录的 XML 数据fileStream
,并且可以将生成的 RDD 写入文本文件就可以了:
val fStream = {
ssc.fileStream[LongWritable, Text, XmlInputFormat](
WATCHDIR, xmlFilter _, newFilesOnly = false, conf = hadoopConf)
}
fStream.foreachRDD(rdd =>
if (rdd.count() == 0) {
logger.info("No files..")
})
val dStream = fStream.map{ case(x, y) =>
logger.info("Hello from the dStream")
logger.info(y.toString)
scalaxb.fromXML[Music](scala.xml.XML.loadString(y.toString))
}
dStream.foreachRDD(rdd => rdd.saveAsTextFile("file:///tmp/xmlout"))
问题是当我想将 RDD 转换为 DataFrame 以便将它们注册为临时表或saveAsParquetFile
.
这段代码:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
dStream.foreachRDD(rdd => rdd.distinct().toDF().printSchema())
导致此错误:
java.lang.UnsupportedOperationException: Schema for type scalaxb.DataRecord[scala.Any] is not supported
我会认为,因为为我的记录生成案例类,并且Spark 使用反射来推断scalaxb
会很简单,我看到这就是它想要做的,除了 Spark 不支持该类型。是否有任何 Spark 或 Scalaxb 专家对如何使 Scalaxb 生成的案例类与 Spark 兼容有任何想法?scalaxb.DataRecord
顺便说一句,这里是从 scalaxb 生成的类:
package generated
case class Song(attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val title = attributes.get("@title") map { _.as[String] }
lazy val length = attributes.get("@length") map { _.as[String] }
}
case class Album(song: Seq[generated.Song] = Nil,
description: String,
attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val title = attributes.get("@title") map { _.as[String] }
}
case class Artist(album: Seq[generated.Album] = Nil,
attributes: Map[String, scalaxb.DataRecord[Any]] = Map()) {
lazy val name = attributes.get("@name") map { _.as[String] }
}
case class Music(artist: Seq[generated.Artist] = Nil)