1

我通过 AWS 的 Managed Airflow 服务 (MWAA) 向 EMR 提交 Spark 作业。这些作业在 MWAA 1.10.12 版中运行良好。最近,AWS 发布了更新版本的 MWAA,即 2.0.2。我用这个版本创建了一个新环境,并尝试将相同的作业提交给 EMR。但它失败并出现以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
    at org.apache.hadoop.fs.Path.initialize(Path.java:263)
    at org.apache.hadoop.fs.Path.<init>(Path.java:221)
    at org.apache.hadoop.fs.Path.<init>(Path.java:129)
    at org.apache.hadoop.fs.Globber.doGlob(Globber.java:229)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2096)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2078)
    at org.apache.spark.deploy.DependencyUtils$.resolveGlobPath(DependencyUtils.scala:192)
    at org.apache.spark.deploy.DependencyUtils$.$anonfun$resolveGlobPaths$2(DependencyUtils.scala:147)
    at org.apache.spark.deploy.DependencyUtils$.$anonfun$resolveGlobPaths$2$adapted(DependencyUtils.scala:145)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.deploy.DependencyUtils$.resolveGlobPaths(DependencyUtils.scala:145)
    at org.apache.spark.deploy.SparkSubmit.$anonfun$prepareSubmitEnvironment$5(SparkSubmit.scala:364)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:364)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:902)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1038)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1047)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
    at java.net.URI$Parser.fail(URI.java:2847)
    at java.net.URI$Parser.failExpecting(URI.java:2853)
    at java.net.URI$Parser.parse(URI.java:3056)
    at java.net.URI.<init>(URI.java:746)
    at org.apache.hadoop.fs.Path.initialize(Path.java:260)
    ... 27 more
Command exiting with ret '1'

spark-submit命令如下所示:

spark-submit --deploy-mode cluster 
             --master yarn 
             --queue low 
             --jars s3://bucket-name/jars/mysql-connector-java-8.0.20.jar,s3://bucket-name/jars/postgresql-42.1.1.jar,s3://bucket-name/jars/delta-core_2.12-1.0.0.jar 
             --py-files s3://bucket-name/dependencies/spark.py, s3://bucket-name/dependencies/helper_functions.py 
             --files s3://bucket-name/configs/spark_config.json 
             s3://bucket-name/jobs/data_processor.py [command-line-args]

作业提交在 10 秒内失败。因此,未创建 YARN 应用程序 ID。

我试图解决的错误:

  1. 我将亚马逊相关包添加到requirements.txt
apache-airflow[mysql]==2.0.2
pycryptodome==3.9.9
apache-airflow-providers-amazon==1.3.0
  1. 我将导入语句从:
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
  1. 将 URI 方案更改为 s3n 和 s3a

我查看了有关 MWAA 以及 Airflow 2.0.2 的官方文档和博客,并进行了上述更改。但到目前为止没有任何效果。我寻求帮助以尽早解决此错误。提前致谢

4

0 回答 0