有这样的一天:
import os
from datetime import timedelta
from xxx import on_failure_opsgenie
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace(".py", "")
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
def kaboom(*args, **kwargs):
print("goodbye cruel world")
print(args)
print(kwargs)
assert 1 == 2
with DAG(
dag_id=DAG_ID,
default_args=DEFAULT_ARGS,
description="Print contents of airflow.cfg to logs",
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
on_failure_callback=on_failure_opsgenie,
) as dag:
get_airflow_cfg_operator = PythonOperator(task_id="gonna_explode", python_callable=kaboom)
DAG 有目的地按预期失败。但是,on_failure_opsgenie
没有做它应该做的;如何在 AWS MWAA 中获取日志或调试失败的 on-failure-callback?