如何配置 Airflow,以便 DAG 中的任何故障都会(立即)导致松弛消息?
使用你可以通过在 DAG 级别airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
传递一个函数来实现这一点。on_failure_callback
奖励 ;-) 包括将失败任务的名称传递给消息的方法。
def fail():
raise Exception("Task failed intentionally for testing purpose")
def success():
print("success")
def task_fail_slack_alert(context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_tasks = []
for ti in tis_dagrun:
if ti.state == State.FAILED:
# Adding log url
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":red_circle: Dag Failed."
}
},
{
"type": "section",
"block_id": f"section{uuid.uuid4()}",
"text": {
"type": "mrkdwn",
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
},
"accessory": {
"type": "image",
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
"alt_text": "Airflow"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Failed Tasks: {', '.join(failed_tasks)}"
}
}
]
failed_alert = SlackWebhookHook(
http_conn_id='slack-airflow',
channel="#airflow-notifications",
blocks=blocks,
username='airflow'
)
failed_alert.execute()
return
default_args = {
'owner': 'airflow'
}
with DAG(
dag_id="slack-test",
default_args=default_args,
start_date=datetime(2021,8,19),
schedule_interval=None,
on_failure_callback=task_fail_slack_alert
) as dag:
task_1 = PythonOperator(
task_id="slack_notification_test",
python_callable=fail
)
task_2 = PythonOperator(
task_id="slack_notification_test2",
python_callable=success
)