我通过 AWS 的 Managed Airflow 服务 (MWAA) 向 EMR 提交 Spark 作业。这些作业在 MWAA 1.10.12 版中运行良好。最近,AWS 发布了更新版本的 MWAA,即 2.0.2。我用这个版本创建了一个新环境,并尝试将相同的作业提交给 EMR。但它失败并出现以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
at org.apache.hadoop.fs.Path.initialize(Path.java:263)
at org.apache.hadoop.fs.Path.<init>(Path.java:221)
at org.apache.hadoop.fs.Path.<init>(Path.java:129)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:229)
at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2096)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2078)
at org.apache.spark.deploy.DependencyUtils$.resolveGlobPath(DependencyUtils.scala:192)
at org.apache.spark.deploy.DependencyUtils$.$anonfun$resolveGlobPaths$2(DependencyUtils.scala:147)
at org.apache.spark.deploy.DependencyUtils$.$anonfun$resolveGlobPaths$2$adapted(DependencyUtils.scala:145)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.deploy.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:145)
at org.apache.spark.deploy.SparkSubmit.$anonfun$prepareSubmitEnvironment$5(SparkSubmit.scala:364)
at scala.Option.map(Option.scala:230)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:364)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:902)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1038)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1047)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
at java.net.URI$Parser.fail(URI.java:2847)
at java.net.URI$Parser.failExpecting(URI.java:2853)
at java.net.URI$Parser.parse(URI.java:3056)
at java.net.URI.<init>(URI.java:746)
at org.apache.hadoop.fs.Path.initialize(Path.java:260)
... 27 more
Command exiting with ret '1'
spark-submit命令如下所示:
spark-submit --deploy-mode cluster
--master yarn
--queue low
--jars s3://bucket-name/jars/mysql-connector-java-8.0.20.jar,s3://bucket-name/jars/postgresql-42.1.1.jar,s3://bucket-name/jars/delta-core_2.12-1.0.0.jar
--py-files s3://bucket-name/dependencies/spark.py, s3://bucket-name/dependencies/helper_functions.py
--files s3://bucket-name/configs/spark_config.json
s3://bucket-name/jobs/data_processor.py [command-line-args]
作业提交在 10 秒内失败。因此,未创建 YARN 应用程序 ID。
我试图解决的错误:
- 我将亚马逊相关包添加到
requirements.txt
:
apache-airflow[mysql]==2.0.2
pycryptodome==3.9.9
apache-airflow-providers-amazon==1.3.0
- 我将导入语句从:
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
至
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
- 将 URI 方案更改为 s3n 和 s3a
我查看了有关 MWAA 以及 Airflow 2.0.2 的官方文档和博客,并进行了上述更改。但到目前为止没有任何效果。我寻求帮助以尽早解决此错误。提前致谢