5

请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。

我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果这样可以简化操作,我可以在本地下载它们。我的目标是尽可能有效地做到这一点。让一些单线程主机下载和解析一堆 CSV 文件,而我的几十个 Spark 工作人员却无所事事,这似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。

CSV 文件排列在一个目录结构中,如下所示:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...

我有两年的数据,每天都有目录,每个里面都有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 有问题,如果有几个有问题的文件,我不希望整个工作崩溃。只要在某处发生的日志中通知我,就可以跳过这些文件。

似乎我想到的每个 Spark 项目都是这种形式,我不知道如何解决。(例如,尝试读入一堆制表符分隔的天气数据,或者读入一堆日志文件来查看这些数据。)

我试过的

我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言。我对使用正确的成语/工具更感兴趣。

纯斯卡拉

我最初的想法是枚举parallelize所有year/mm-dd组合的列表,这样我就可以让我的 Spark 工作人员每天都独立处理(下载并解析所有 CSV 文件,然后将它们堆叠在一起(unionAll())以减少它们)。不幸的是,使用spark-csv库下载和解析 CSV 文件只能在“父”/主作业中完成,而不是从每个子作业中完成,因为Spark 不允许作业嵌套。因此,只要我想使用 Spark 库进行导入/解析,这将不起作用。

混合语言

当然,您可以使用该语言的原生 CSV 解析来读取每个文件,然后将它们“上传”到 Spark。在 R 中,这是一些包的组合,用于将文件从 S3 中取出,然后是 a read.csv,最后以 acreateDataFrame()将数据放入 Spark。不幸的是,这真的很慢,而且似乎倒退了我希望 Spark 的工作方式。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?

Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp

我已经开始研究这些量身定制的工具,很快就不知所措。我的理解是,许多/所有这些工具都可用于将我的 CSV 文件从 S3 获取到 HDFS。

当然,从 HDFS 读取我的 CSV 文件会比 S3 更快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 需要解析,并且我不知道在 Spark 中执行此操作的分布式方式。

4

2 回答 2

2

所以现在(Spark 1.4)SparkR 支持json文件parquet结构。可以解析 CSV 文件,但随后需要使用额外的 jar 启动 spark 上下文(需要下载并放置在适当的文件夹中,我自己从未这样做过,但我的同事有)。

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

文档中有更多信息。我希望更新的 spark 版本会对此有更多的支持。

如果您不这样做,则需要使用不同的文件结构或使用 python 将所有文件.csv.parquet. 这是最近的一次 python 谈话中的一个片段,它做到了这一点。

data = sc.textFile(s3_paths, 1200).cache()

def caster(x):
    return Row(colname1 = x[0], colname2 = x[1])

df_rdd = data\
    .map(lambda x: x.split(','))\
    .map(caster)

ddf = sqlContext.inferSchema(df_rdd).cache()

ddf.write.save('s3n://<bucket>/<filename>.parquet')

另外,你的数据集有多大?您甚至可能不需要 spark 进行分析。请注意,从现在开始;

  • SparkR 仅支持 DataFrame。
  • 还没有分布式机器学习。
  • 对于可视化,如果你想使用像ggplot2.
  • 如果您的数据集不超过几 GB,那么学习 spark 的额外麻烦可能还不值得
  • 现在很谦虚,但你可以期待更多的未来
于 2015-08-03T20:32:43.510 回答
1

我之前遇到过这个问题(但是会读取大量 Parquet 文件),我的建议是避免使用数据帧并使用 RDD。

使用的一般成语是:

  1. 读入文件列表,每个文件都是一行(在驱动程序中)。这里的预期输出是字符串列表
  2. 并行化字符串列表并使用客户 csv 阅读器映射它们。返回是案例类的列表。

如果最终您想要一个可以重写为 parquet 或数据库的数据结构,例如 List[weather_data],您也可以使用 flatMap。

于 2015-11-21T16:27:32.223 回答