6

我一直在尝试获取松弛消息回调以触发 SLA 未命中。我注意到:

  1. SLA 未命中在 Airflow Web UI 中成功注册 slamiss/list/

  2. on_failure_callback工作成功

但是,sla_miss_callback函数本身永远不会被触发。

我试过的:

  • 在 级别、DAG 级别和任务级别添加sla和的不同组合sla_miss_callbackdefault_args

  • 检查我们的调度程序和工作人员的日志以获取与 SLA 相关的消息(另请参见此处),但我们没有看到任何内容

  • 如果从任何其他
    基本任务或函数调用松弛消息回调函数

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': send_task_failed_msg_to_slack,
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=send_sla_miss_message_to_slack,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

def sleep():
    """ Sleep for 2 minutes """
    time.sleep(90)
    LOGGER.info("Slept for 2 minutes")

def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


sleep = PythonOperator(
    task_id="sleep",
    python_callable=sleep,
    dag=dag
)

simple_task = PythonOperator(
    task_id="simple_task",
    python_callable=simple_print,
    provide_context=True,
    dag=dag
)

sleep >> simple_task
4

7 回答 7

4

我曾经遇到过类似的情况。
在调查调度程序日志时,我发现了以下错误:

[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO -  --------------> ABOUT TO CALL SLA MISS CALL BACK  
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG 
sla_miss_alert() takes 1 positional arguments but 5 were given

问题是您的sla_miss_callback函数只需要 1 个参数,但实际上应该是这样的:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Function that alerts me that dag_id missed sla"""
    # <function code here>

作为参考,请查看Airflow 源代码

注意:不要sla_miss_callback=sla_miss_alert输入default_args. 它应该在 DAG 定义本身中定义。

于 2020-07-08T14:02:17.250 回答
4

使用SLA missedExecution Timeout警报示例:

  • 起初,你会SLA missed在 2 分钟的任务运行后得到,
  • 然后,4 分钟后任务将失败并Execution Timeout发出警报。
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout

此外,您log_url在消息中拥有权限,因此您可以轻松地在 Airflow 中打开任务日志。

示例松弛消息

import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple

from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator

SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."


def send_slack_alert_sla_miss(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[Tuple],
        blocking_tis: List[TaskInstance],
) -> None:
    """Send `SLA missed` alert to Slack"""
    task_instance: TaskInstance = blocking_tis[0]
    message = dedent(
        f"""
        :warning: Task SLA missed.
        *DAG*: {dag.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Task State*: `{task_instance.state}`
        *Blocking Task List*: {blocking_task_list}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(message=message)


def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
    """Send `Task Failed` notification to Slack"""
    task_instance: TaskInstance = context.get("task_instance")
    exception: AirflowException = context.get("exception")

    status = SLACK_STATUS_TASK_FAILED
    if isinstance(exception, AirflowTaskTimeout):
        status = SLACK_STATUS_EXECUTION_TIMEOUT

    # Prepare formatted Slack message
    message = dedent(
        f"""
        {status}
        *DAG*: {task_instance.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Execution Timeout*: {task_instance.task.execution_timeout}
        _** Max time allowed for the execution of this task instance_
        *Task Duration*: {timedelta(seconds=round(task_instance.duration))}
        *Task State*: `{task_instance.state}`
        *Exception*: {exception}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(
        message=message,
        context=context,
    )


def send_slack_alert(
        message: str,
        context: Optional[Dict[str, Any]] = None,
) -> None:
    """Send prepared message to Slack"""
    slack_webhook_token = BaseHook.get_connection("slack").password
    notification = SlackWebhookOperator(
        task_id="slack_notification",
        http_conn_id="slack",
        webhook_token=slack_webhook_token,
        message=message,
        username="airflow",
    )
    notification.execute(context)


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "airflow",
    "email": ["test@test,com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(minutes=2),  # Default Task SLA time
    "execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
    "on_failure_callback": send_slack_alert_task_failed,
}

with DAG(
        dag_id="test_sla",
        schedule_interval="*/5 * * * *",
        start_date=datetime(2021, 1, 11),
        default_args=default_args,
        sla_miss_callback=send_slack_alert_sla_miss,  # Must be set here, not in default_args!
) as dag:
    delay_python_task = PythonOperator(
        task_id="delay_five_minutes_python_task",
        python_callable=lambda: time.sleep(300),
    )
于 2021-01-11T18:53:48.567 回答
2

似乎使 sla_miss_callback 工作的唯一方法是显式传递它需要的参数......没有其他方法对我有用,这些参数:'dag'、'task_list'、'blocking_task_list'、'slas' 和'blocking_tis' 根本没有被发送到回调。

TypeError: print_sla_miss() missing 5 required positional arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis'

于 2020-10-26T15:55:05.020 回答
0

很多这些答案已经完成了 90%,所以我想分享我的示例,使用 bash 运算符结合了我从上述所有回复和其他资源中找到的内容

最重要的是如何在 dag 定义中而不是在 default_args 中定义 sla_miss_callback,以及不将上下文传递给 sla 函​​数。

"""
A simple example showing the basics of using a custom SLA notification response.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.slack_operator import SlackAPIPostOperator
from slack import slack_attachment
from airflow.hooks.base_hook import BaseHook
import urllib

#slack alert for sla_miss
def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
    dag_id = slas[0].dag_id
    task_id = slas[0].task_id
    execution_date = slas[0].execution_date.isoformat()
    base_url = 'webserver_url_here'
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')
    message = (f':alert: *Airflow SLA Miss*'
               f'\n\n'
               f'*DAG:* {dag_id}\n'
               f'*Task:* {task_id}\n'
               f'*Execution Date:* {execution_date}'
               f'\n\n'
               f'<{dag_url}|Click here to view DAG>')

    sla_miss_alert = SlackAPIPostOperator(
        task_id='slack_sla_miss',
        channel='airflow-alerts-test',
        token=str(BaseHook.get_connection("slack").password),
        text = message
    )
    return sla_miss_alert.execute()

#slack alert for successful task completion
def slack_success_task(context):
    success_alert = SlackAPIPostOperator(
        task_id='slack_success',
        channel='airflow-alerts-test',
        token=str(BaseHook.get_connection("slack").password),
        text = "Test successful"
    )
    return success_alert.execute(context=context)


default_args = {
    "depends_on_past": False,
    'start_date': datetime(2020, 11, 18),
    "retries": 0
}

# Create a basic DAG with our args
# Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.
dag = DAG(
    dag_id='sla_slack_v6',
    default_args=default_args,
    sla_miss_callback=slack_sla_miss,
    catchup=False,
   # A common interval to make the job fire when we run it
    schedule_interval=timedelta(minutes=3)
)

# Add a task that will always fail the SLA
t1 = BashOperator(
    task_id='timeout_test_sla_miss',
    # Sleep 60 seconds to guarantee we miss the SLA
    bash_command='sleep 60',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 10 second SLA
    sla=timedelta(seconds=10),
    dag=dag
    )

t2 = BashOperator(
    task_id='timeout_test_sla_miss_task_2',
    # Sleep 30 seconds to guarantee we miss the SLA of 20 seconds set in this task
    bash_command='sleep 30',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 20 second SLA
    sla=timedelta(seconds=20),
    dag=dag
    )

t3 = BashOperator(
    task_id='timeout_test_sla_miss_task_3',
    # Sleep 60 seconds to guarantee we miss the SLA
    bash_command='sleep 60',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 30 second SLA
    sla=timedelta(seconds=30),
    dag=dag
    )
 
t1 >> t2 >> t3
于 2020-12-10T14:23:08.363 回答
0

我遇到了同样的问题,但能够使用以下代码使其工作:


import logging as log
import airflow
import time
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow import configuration
import urllib
from airflow.operators.slack_operator import SlackAPIPostOperator


def sleep():
    """ Sleep for 2 minutes """
    time.sleep(60*2)
    log.info("Slept for 2 minutes")


def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


def slack_on_sla_miss(dag,
                      task_list,
                      blocking_task_list,
                      slas,
                      blocking_tis):

    log.info('Running slack_on_sla_miss')

    slack_conn_id = 'slack_default'
    slack_channel = '#general'

    dag_id = slas[0].dag_id
    task_id = slas[0].task_id
    execution_date = slas[0].execution_date.isoformat()

    base_url = configuration.get('webserver', 'BASE_URL')
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')

    message = (f':o: *Airflow SLA Miss*'
               f'\n\n'
               f'*DAG:* {dag_id}\n'
               f'*Task:* {task_id}\n'
               f'*Execution Date:* {execution_date}'
               f'\n\n'
               f'<{dag_url}|Click here to view>')

    slack_op = SlackAPIPostOperator(task_id='slack_failed',
                                    slack_conn_id=slack_conn_id,
                                    channel=slack_channel,
                                    text=message)
    slack_op.execute()


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0, minute=1),
    "retries": 0,
    'priority_weight': 1,
}

dag = DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=slack_on_sla_miss,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
)

with dag:
    sleep = PythonOperator(
        task_id="sleep",
        python_callable=sleep,
        )

    simple_task = PythonOperator(
        task_id="simple_task",
        python_callable=simple_print,
        provide_context=True,
        sla=timedelta(minutes=1),
        )

    sleep >> simple_task
于 2020-09-08T13:03:08.743 回答
0

我认为气流文档对此有点模糊。

而不是方法签名作为

def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis)

像这样修改你的签名

def slack_sla_miss(*args, **kwargs)

这样所有的参数都被传递了。您不会收到您在日志中看到的错误。

在网址上了解到这一点 - https://www.cloudwalker.io/2020/12/15/airflow-sla-management/

于 2021-03-08T10:32:06.777 回答
-1

我自己也遇到过这个问题。与on_failure_callback寻找 python 可调用函数不同,它似乎sla_miss_callback需要完整的函数调用。

一个对我有用的例子:

def sla_miss_alert(dag_id):
    """
    Function that alerts me that dag_id missed sla
    """
    <function code here>

def task_failure_alert(dag_id, context):
    """
    Function that alerts me that a task failed
    """
    <function code here>


dag_id = 'sla_test'
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': partial(task_failure_alert, dag_id),
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=sla_miss_alert(dag_id),
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

据我所知, sla_miss_callback 无法访问上下文,这很不幸。一旦我停止寻找上下文,我终于得到了警报。

于 2020-03-30T02:39:27.237 回答