请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。
我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果这样可以简化操作,我可以在本地下载它们。我的目标是尽可能有效地做到这一点。让一些单线程主机下载和解析一堆 CSV 文件,而我的几十个 Spark 工作人员却无所事事,这似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。
CSV 文件排列在一个目录结构中,如下所示:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
我有两年的数据,每天都有目录,每个里面都有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 有问题,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某处发生的日志中通知我,就可以跳过这些文件。
似乎我想到的每个 Spark 项目都是这种形式,我不知道如何解决。(例如,尝试读入一堆制表符分隔的天气数据,或者读入一堆日志文件来查看这些数据。)
我试过的
我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言。我对使用正确的成语/工具更感兴趣。
纯斯卡拉
我最初的想法是枚举parallelize
所有year/mm-dd
组合的列表,这样我就可以让我的 Spark 工作人员每天都独立处理(下载并解析所有 CSV 文件,然后将它们堆叠在一起(unionAll()
)以减少它们)。不幸的是,使用spark-csv库下载和解析 CSV 文件只能在“父”/主作业中完成,而不是从每个子作业中完成,因为Spark 不允许作业嵌套。因此,只要我想使用 Spark 库进行导入/解析,这将不起作用。
混合语言
当然,您可以使用该语言的原生 CSV 解析来读取每个文件,然后将它们“上传”到 Spark。在 R 中,这是一些包的组合,用于将文件从 S3 中取出,然后是 a read.csv
,最后以 acreateDataFrame()
将数据放入 Spark。不幸的是,这真的很慢,而且似乎倒退了我希望 Spark 的工作方式。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?
Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp
我已经开始研究这些量身定制的工具,很快就不知所措。我的理解是,许多/所有这些工具都可用于将我的 CSV 文件从 S3 获取到 HDFS。
当然,从 HDFS 读取我的 CSV 文件会比 S3 更快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 需要解析,并且我不知道在 Spark 中执行此操作的分布式方式。