4

我正在使用 AWS 胶水 ETL 作业以镶木地板格式转换 s3 上的 CSV 数据。Snappy 压缩的 parquet 数据存储回 s3。

完整架构:当数据上传到 s3 时,如果 lambda 函数尚未运行,它会触发粘合 ETL 作业。一个作业不断地在 s3 上上传胶水输入数据。Glue 成功处理了 100GB 的数据,但由于输入数据堆积到 0.5 到 1TB,Glue 作业在运行很长时间(例如 10 小时)后抛出错误。

Traceback (most recent call last):
File "script_2018-01-08-23-01-55.py", line 60, in <module>
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")
File "/mnt/yarn/usercache/root/appcache/application_1515414270379_0004/container_1515414270379_0004_02_000001/pyspark.zip/pyspark/sql/readwriter.py", line 550, in save
File "/mnt/yarn/usercache/root/appcache/application_1515414270379_0004/container_1515414270379_0004_02_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/mnt/yarn/usercache/root/appcache/application_1515414270379_0004/container_1515414270379_0004_02_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1515414270379_0004/container_1515414270379_0004_02_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o193.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 3228 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127)
... 30 more

End of LogType:stdout

我做了很多工作来解决这个错误,但没有任何线索。尽管我尝试了一些建议的方法,例如-

setting SparkConf: conf.set("spark.driver.maxResultSize", "3g")

上述设置无效。如果您能提供任何指导来解决此问题,我将不胜感激。

4

2 回答 2

0
from pyspark import SparkConf

sc_conf.set("spark.driver.maxResultSize", 0)
sc_conf.set("spark.executor.memory", '4g')

sc = SparkContext(conf=sc_conf)

对我有用,请试一试

于 2019-09-11T02:20:30.763 回答
0

Glue 默认 DPU 计数为 10 DPU,其中单个数据处理单元 (DPU) 提供 4 个 vCPU 和 16 GB 内存。尝试增加单个作业运行的 DPU 计数。

对于您的用例,我建议将 DPU 计数增加到 64。至于单次运行,您将收到近 1 TB 的文件。目前默认情况下,您可以使用 100 DPU 来运行单个 ETL 作业。参考尽管您始终可以联系 AWS 支持以获取任何限制增加。

于 2018-01-20T03:38:40.777 回答