使用SLA missed
和Execution Timeout
警报示例:
- 起初,你会
SLA missed
在 2 分钟的任务运行后得到,
- 然后,4 分钟后任务将失败并
Execution Timeout
发出警报。
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
此外,您log_url
在消息中拥有权限,因此您可以轻松地在 Airflow 中打开任务日志。
示例松弛消息
import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple
from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."
def send_slack_alert_sla_miss(
dag: DAG,
task_list: str,
blocking_task_list: str,
slas: List[Tuple],
blocking_tis: List[TaskInstance],
) -> None:
"""Send `SLA missed` alert to Slack"""
task_instance: TaskInstance = blocking_tis[0]
message = dedent(
f"""
:warning: Task SLA missed.
*DAG*: {dag.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Task State*: `{task_instance.state}`
*Blocking Task List*: {blocking_task_list}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(message=message)
def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
"""Send `Task Failed` notification to Slack"""
task_instance: TaskInstance = context.get("task_instance")
exception: AirflowException = context.get("exception")
status = SLACK_STATUS_TASK_FAILED
if isinstance(exception, AirflowTaskTimeout):
status = SLACK_STATUS_EXECUTION_TIMEOUT
# Prepare formatted Slack message
message = dedent(
f"""
{status}
*DAG*: {task_instance.dag_id}
*Task*: {task_instance.task_id}
*Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
*SLA Time*: {task_instance.task.sla}
_* Time by which the job is expected to succeed_
*Execution Timeout*: {task_instance.task.execution_timeout}
_** Max time allowed for the execution of this task instance_
*Task Duration*: {timedelta(seconds=round(task_instance.duration))}
*Task State*: `{task_instance.state}`
*Exception*: {exception}
*Log URL*: {task_instance.log_url}
"""
)
send_slack_alert(
message=message,
context=context,
)
def send_slack_alert(
message: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Send prepared message to Slack"""
slack_webhook_token = BaseHook.get_connection("slack").password
notification = SlackWebhookOperator(
task_id="slack_notification",
http_conn_id="slack",
webhook_token=slack_webhook_token,
message=message,
username="airflow",
)
notification.execute(context)
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"email": ["test@test,com"],
"email_on_failure": True,
"depends_on_past": False,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(minutes=2), # Default Task SLA time
"execution_timeout": timedelta(minutes=4), # Default Task Execution Timeout
"on_failure_callback": send_slack_alert_task_failed,
}
with DAG(
dag_id="test_sla",
schedule_interval="*/5 * * * *",
start_date=datetime(2021, 1, 11),
default_args=default_args,
sla_miss_callback=send_slack_alert_sla_miss, # Must be set here, not in default_args!
) as dag:
delay_python_task = PythonOperator(
task_id="delay_five_minutes_python_task",
python_callable=lambda: time.sleep(300),
)