5

在初始 EMR 步骤中,我使用S3DistCP将文件树从 S3 复制到 HDFS 。hdfs dfs -ls -R hdfs:///data_dir显示预期的文件,类似于:

/data_dir/year=2015/
/data_dir/year=2015/month=01/
/data_dir/year=2015/month=01/day=01/
/data_dir/year=2015/month=01/day=01/data01.12345678
/data_dir/year=2015/month=01/day=01/data02.12345678
/data_dir/year=2015/month=01/day=01/data03.12345678

“目录”被列为零字节文件。

然后我运行一个需要读取这些文件的火花步骤。因此加载代码是:

sqlctx.read.json('hdfs:///data_dir, schema=schema)

作业因 java 异常而失败

java.io.IOException: Not a file: hdfs://10.159.123.38:9000/data_dir/year=2015

我曾经(也许天真地)假设 spark 会递归地下降“dir 树”并加载数据文件。如果我指向 S3,它会成功加载数据。

我误解了HDFS吗?我可以告诉 spark 忽略零字节文件吗?我可以使用 S3DistCp 来展平树吗?

4

3 回答 3

6

在当前 spark 上下文的 Hadoop 配置中,为 Hadoop InputFormat 配置“递归”读取之前获取 sql ctx

val hadoopConf = sparkCtx.hadoopConfiguration
hadoopConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")

这将给出“不是文件”的解决方案。接下来,读取多个文件:

Hadoop 作业从多个目录获取输入文件

或将文件列表合并为单个数据框:

使用 Spark 从目录中读取多个文件

于 2015-11-06T22:29:50.777 回答
3

问题解决了:

spark-submit ...
    --conf spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true \
    --conf spark.hive.mapred.supports.subdirectories=true \
    ...
于 2019-12-12T14:24:47.803 回答
1

在 spark 版本 2.1.0 中必须这样设置参数:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
于 2018-01-09T18:31:45.043 回答