我设置了一些 dags,最终以 spark-submit 命令结束 spark 集群。如果这有所作为,我正在使用集群模式。无论如何,所以我的代码有效,但我意识到如果火花作业失败,我不一定会从 Airflow UI 中知道。通过集群模式触发作业,Airflow 将作业交给可用的工作人员,因此气流不知道火花作业。
我该如何解决这个问题?
我设置了一些 dags,最终以 spark-submit 命令结束 spark 集群。如果这有所作为,我正在使用集群模式。无论如何,所以我的代码有效,但我意识到如果火花作业失败,我不一定会从 Airflow UI 中知道。通过集群模式触发作业,Airflow 将作业交给可用的工作人员,因此气流不知道火花作业。
我该如何解决这个问题?
气流(从 1.8 版开始)有
SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
如果您使用这些,如果 spark 作业失败,则气流任务将失败。如果您使用 spark1.x 获取实时日志,您可能需要更改 spark_submit_hook 文件中的日志记录部分,因为 spark-submit 甚至会将某些 1.x 版本的错误记录到标准输出(我必须对 1.6.x 版本进行更改)。 1)。
另请注意,自上次稳定版本以来,SparkSubmitOperator 已进行了许多改进。
您可以考虑使用client
模式,因为在 spark 作业完成之前客户端不会终止。气流执行器可以获取退出代码。
否则,您可能需要使用作业服务器。查看https://github.com/spark-jobserver/spark-jobserver
您可以开始利用 LivyOperator 来监控作业,LivyOperator 将在您配置轮询的时间间隔轮询 Spark 作业的状态。例子:
kickoff_streamer_task = LivyOperator(
task_id='kickoff_streamer_task',
dag=dag,
livy_conn_id='lokori',
file='abfs://data@amethiaprime.dfs.core.windows.net/user/draxuser/drax_streamer.jar',
**polling_interval=60**, # used when you want to pull the status of submitted job
queue='root.ids.draxuser',
proxy_user='draxuser',
args=['10', '3000'],
num_executors=4,
conf={
'spark.shuffle.compress': 'false',
'master': 'yarn',
'deploy_mode': 'cluster',
'spark.ui.view.acls': '*'
},
class_name='com.apple.core.drax.dpaas.batch.DraxMatrixProducer',
on_success_callback=livy_callback,
on_failure_callback=_failure_callback
)
在上面的示例中,polling_interval 设置为 60 秒,它将在 60 秒时继续轮询您的作业状态,它会确保为您提供正确的作业状态。