我需要处理流入 S3 文件夹的 xml 文件。目前,我已将其实现如下。
一、使用Spark的fileStream读取文件
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
对于每个 RDD,检查是否有任何文件被读取
if (data.count() !=0)
将字符串写入新的 HDFS 目录
data.coalesce(1).saveAsTextFile(sdir);
创建从上述 HDFS 目录读取的 Dataframe
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
对 Dataframe 进行一些处理并保存为 JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
不知何故,我觉得上述方法非常低效,坦率地说相当学校孩子气。有更好的解决方案吗?任何帮助将不胜感激。
后续问题:如何操作数据框中的字段(而不是列)?我有一个非常复杂的嵌套 xml,当我使用上述方法时,我得到一个包含 9 列和 50 个奇数内部结构数组的数据框。这很好,除了需要修剪某些字段名称。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?