2

我需要处理流入 S3 文件夹的 xml 文件。目前,我已将其实现如下。

一、使用Spark的fileStream读取文件

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

对于每个 RDD,检查是否有任何文件被读取

if (data.count() !=0)

将字符串写入新的 HDFS 目录

data.coalesce(1).saveAsTextFile(sdir);

创建从上述 HDFS 目录读取的 Dataframe

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)

对 Dataframe 进行一些处理并保存为 JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder")

不知何故,我觉得上述方法非常低效,坦率地说相当学校孩子气。有更好的解决方案吗?任何帮助将不胜感激。

后续问题:如何操作数据框中的字段(而不是列)?我有一个非常复杂的嵌套 xml,当我使用上述方法时,我得到一个包含 9 列和 50 个奇数内部结构数组的数据框。这很好,除了需要修剪某些字段名称。有没有办法在不爆炸数据框的情况下实现这一点,因为我需要再次构建相同的结构?

4

1 回答 1

4

如果您使用 Spark 2.0,您可能能够使其与结构化流一起使用:

val inputDF = spark.readStream.format("com.databricks.spark.xml")
  .option("rowTag", "Trans")
  .load(path)
于 2016-11-18T15:41:45.390 回答