4

对于小型 s3 输入文件 (~10GB),粘合 ETL 作业可以正常工作,但对于较大的数据集 (~200GB),该作业会失败。

添加部分ETL代码。

# Converting Dynamic frame to dataframe
df = dropnullfields3.toDF()

# create new partition column
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))

# store the data in parquet format on s3 
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")

作业执行 4 小时并抛出错误。

文件“script_2017-11-23-15-07-32.py”,第 49 行,在 partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append" )文件“/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/pyspark.zip/pyspark/sql/readwriter.py”,第550行,保存文件“/mnt/yarn/usercache/root/appcache/application_1511014974 /container_1511449472652_0001_02_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py”,第 1133 行,通话中文件“/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在 deco 文件“/mnt/yarn/usercache/root/appcache/application_15114494726 container_1511449472652_0001_02_000001/py4j-0.10.4-src.zip/py4j/protocol.py”,第 319 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o172.save 时出错。:org.apache.spark.SparkException:作业中止。在 org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun $write$1.apply(FileFormatWriter.scala:121) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:

日志类型结束:stdout

如果您能提供任何指导来解决此问题,我将不胜感激。

4

1 回答 1

2

您只能maxResultSize在上下文实例化期间设置可配置选项,并且胶水为您提供上下文(从内存中您无法实例化新上下文)。我认为您无法更改此属性的值。

如果您向驱动程序收集超过指定大小的结果,您通常会收到此错误。在这种情况下,您没有这样做,因此错误令人困惑。

似乎您正在生成 3385 个任务,这些任务可能与您输入文件中的日期有关(3385 个日期,约 9 年?)。您可以尝试分批编写此文件,例如

partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))
for year in range(2000,2018):
    partitioned_dataframe = partitioned_dateframe.where(year(part_date) = year)
    partitioned_dataframe.write.partitionBy(['part_date'])
        .format("parquet")
        .save(output_lg_partitioned_dir, mode="append")

我没有检查过这段代码;你至少需要导入pyspark.sql.functions.year它才能工作。

当我使用 Glue 完成数据处理时,我发现批处理工作比尝试成功完成大型数据集更有效。系统不错,但很难调试;大数据的稳定性来之不易。

于 2018-01-10T18:20:43.590 回答