2

我正在使用 pyspark 流式传输到来自 S3 的 ETL 输入文件。

我需要能够在 s3:// 上构建所有原始输入文件的审计跟踪,并且我的 parquet 输出最终在 hdfs:// 上。

给定一个 dstream、rdd 甚至特定的 rdd 分区,是否可以确定 s3 中输入数据的原始文件名?

目前我知道这样做的唯一方法是采用 rdd.toDebugString()并尝试解析它。然而,这感觉真的很hacky并且在某些情况下不起作用。例如,解析调试输出不适用于我也在做的批处理模式导入(使用sc.TextFile("s3://...foo/*")样式 glob)。

有没有人有确定原始文件名的理智方法?

似乎其他一些 spark 用户过去也有过这个问题,例如:

http://apache-spark-user-list.1001560.n3.nabble.com/Access-original-filename-in-a-map-function-tt2831.html

谢谢!

4

1 回答 1

1

我们遇到了同样的问题,而且文件足够小,所以我们使用了sc.wholeTextFiles("s3:...foo/*").

它创建了 RDD,("<path/filename>","<content>")我们将文件名附加到文件内容以供使用。

如何将 RDD[(String, String)] 转换为 RDD[Array[String]]?

于 2015-11-28T20:19:20.883 回答