3

我设置了一些 dags,最终以 spark-submit 命令结束 spark 集群。如果这有所作为,我正在使用集群模式。无论如何,所以我的代码有效,但我意识到如果火花作业失败,我不一定会从 Airflow UI 中知道。通过集群模式触发作业,Airflow 将作业交给可用的工作人员,因此气流不知道火花作业。

我该如何解决这个问题?

4

3 回答 3

4

气流(从 1.8 版开始)有

SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

如果您使用这些,如果 spark 作业失败,则气流任务将失败。如果您使用 spark1.x 获取实时日志,您可能需要更改 spark_submit_hook 文件中的日志记录部分,因为 spark-submit 甚至会将某些 1.x 版本的错误记录到标准输出(我必须对 1.6.x 版本进行更改)。 1)。

另请注意,自上次稳定版本以来,SparkSubmitOperator 已进行了许多改进。

于 2017-06-14T14:22:00.590 回答
2

您可以考虑使用client模式,因为在 spark 作业完成之前客户端不会终止。气流执行器可以获取退出代码。

否则,您可能需要使用作业服务器。查看https://github.com/spark-jobserver/spark-jobserver

于 2017-05-18T06:33:11.893 回答
0

您可以开始利用 LivyOperator 来监控作业,LivyOperator 将在您配置轮询的时间间隔轮询 Spark 作业的状态。例子:

kickoff_streamer_task = LivyOperator(
    task_id='kickoff_streamer_task',
    dag=dag,
    livy_conn_id='lokori',
    file='abfs://data@amethiaprime.dfs.core.windows.net/user/draxuser/drax_streamer.jar',
    **polling_interval=60**,  # used when you want to pull the status of submitted job
    queue='root.ids.draxuser',
    proxy_user='draxuser',
    args=['10', '3000'],
    num_executors=4,
    conf={
        'spark.shuffle.compress': 'false',
        'master': 'yarn',
        'deploy_mode': 'cluster',
        'spark.ui.view.acls': '*'
    },
    class_name='com.apple.core.drax.dpaas.batch.DraxMatrixProducer',
    on_success_callback=livy_callback,
    on_failure_callback=_failure_callback
)

在上面的示例中,polling_interval 设置为 60 秒,它将在 60 秒时继续轮询您的作业状态,它会确保为您提供正确的作业状态。

于 2021-06-12T11:45:00.613 回答