4

如何配置 Airflow,以便 DAG 中的任何故障都会(立即)导致松弛消息?

此时我通过创建一个 slack_failed_task 来管理它:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

并将此任务 (one_failed) 设置为 DAG 中其他任务的上游:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,看起来工作量很大。

有没有办法扩展email_on_failureDAG 中的属性?

奖励 ;-) 包括将失败任务的名称传递给消息的方法。

4

5 回答 5

19

也许这个例子会有所帮助:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
于 2017-06-27T18:40:07.350 回答
5

尝试Airflow 版本>=1.10.0 中的新SlackWebhookOperator

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

注意:确保您已将slack_connectionAirflow 连接添加为

host=https://hooks.slack.com/services/
于 2018-12-04T06:12:18.260 回答
3

BaseOperator 支持“on_failure_callback”参数:

on_failure_callback (callable) – 此任务的任务实例失败时要调用的函数。上下文字典作为单个参数传递给此函数。上下文包含对任务实例相关对象的引用,并记录在 API 的宏部分下。

我没有对此进行测试,但您应该能够定义一个函数,该函数在失败时发布松弛并将其传递给每个任务定义。要获取当前任务的名称,您可以使用 {{ task_id }} 模板。

于 2017-06-26T09:29:20.743 回答
2

如何配置 Airflow,以便 DAG 中的任何故障都会(立即)导致松弛消息?

使用你可以通过在 DAG 级别airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook传递一个函数来实现这一点。on_failure_callback

奖励 ;-) 包括将失败任务的名称传递给消息的方法。


def fail():
    raise Exception("Task failed intentionally for testing purpose")

def success():
    print("success")

def task_fail_slack_alert(context):
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_tasks = []
    for ti in tis_dagrun:
        if ti.state == State.FAILED:
            # Adding log url
            failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
    
    dag=context.get('task_instance').dag_id
    exec_date=context.get('execution_date')

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": ":red_circle: Dag Failed."
            }
        },
        {
            "type": "section",
            "block_id": f"section{uuid.uuid4()}",
            "text": {
                "type": "mrkdwn",
                "text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
            },
            "accessory": {
                "type": "image",
                "image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
                "alt_text": "Airflow"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"Failed Tasks: {', '.join(failed_tasks)}"
            }
        }
    ]
    failed_alert = SlackWebhookHook(
        http_conn_id='slack-airflow',
        channel="#airflow-notifications",    
        blocks=blocks,
        username='airflow'
    )
    failed_alert.execute()
    return 

default_args = {
    'owner': 'airflow'
}
with DAG(
    dag_id="slack-test",
    default_args=default_args,
    start_date=datetime(2021,8,19),
    schedule_interval=None,
    on_failure_callback=task_fail_slack_alert
) as dag:

    task_1 = PythonOperator(
        task_id="slack_notification_test",
        python_callable=fail
    )

    task_2 = PythonOperator(
        task_id="slack_notification_test2",
        python_callable=success
    )

Slack 消息预览

于 2021-08-19T13:31:28.603 回答
1

我更愿意将回调添加到 DAG 并由其所有任务继承:

def on_failure_callback(context):
    webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
    slack_data = {
        'text': "@here DAG {} Failed".format(context['dag'].dag_id)
    }

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

dag = DAG(
    dag_id='dag_with_templated_dir',
    start_date=datetime(2020, 1, 1),
    on_failure_callback=on_failure_callback
)
于 2020-07-27T11:50:33.507 回答