1

我想写下气流 dags cleanup dag。我' https://github.com/teamclairvoyant/airflow-maintenance-dags供参考。我编写了一个清理 DAG,它获取所有 dag 并检查它们的时间表,如果是一次,它会删除 DAG。当我尝试运行清理 dag 时,它会抛出 sqlchemy 错误(已附加屏幕截图)。请帮我解决这个错误。

from airflow.operators.python_operator import PythonOperator
from airflow import settings
from datetime import timedelta
import os
import os.path
import socket
import logging
import airflow


DAG_ID = "AIRFLOW_CLEANUP_DAG"
START_DATE = airflow.utils.dates.days_ago(1)
SCHEDULE_INTERVAL = "@once"
DAG_OWNER_NAME = "PDM"
ALERT_EMAIL_ADDRESSES = []

ENABLE_DELETE = True

default_args = {
    "owner": DAG_OWNER_NAME,
    "depends_on_past": False,
    "email": ALERT_EMAIL_ADDRESSES,
    "email_on_failure": True,
    "email_on_retry": False,
    "start_date": START_DATE,
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}

dag = DAG(
    DAG_ID,
    default_args=default_args,
    schedule_interval=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    catchup=False,
    description="Performing airflow cleanup",
)

if hasattr(dag, "doc_md"):
    dag.doc_md = __doc__
if hasattr(dag, "catchup"):
    dag.catchup = False


def cleanup_once_dags_fn(**context):
    logging.info("Starting to run cleanup Process")

    try:
        host_name = socket.gethostname()
        host_ip = socket.gethostbyname(host_name)
        logging.info("Running on Machine with Host Name: " + host_name)
        logging.info("Running on Machine with IP: " + host_ip)
    except Exception as e:
        print("Unable to get Host Name and IP: " + str(e))

    session = settings.Session()

    logging.info("Configurations:")
    logging.info("enable_delete:            " + str(ENABLE_DELETE))
    logging.info("session:                  " + str(session))
    logging.info("")

    dags = session.query(DagModel).all()
    entries_to_delete = []
    logging.info(f"Total dags :: {len(dags)}")
    for dag in dags:
        dag_schedule = dag.schedule_interval
        if dag_schedule == "@once" and dag.dag_id=='ba54206d-078c-42e8-a6b5-ad579e833364':
            entries_to_delete.append(dag)
    logging.info(f"Dags with once schedule {len(entries_to_delete)}")

    logging.info("Process will be Deleting the DAG(s) from the DB:")

    logging.info("Process will be Deleting " + str(len(entries_to_delete)) + " DAG(s)")

    if ENABLE_DELETE:
        logging.info("Performing Delete...")
        for entry in entries_to_delete:
            session.delete(entry)
        session.commit()
        logging.info("Finished Performing Delete")
    else:
        logging.warn("You're opted to skip deleting the DAG entries!!!")

    logging.info("Finished Running Clear Process")


cleanup_once_dags = PythonOperator(
    task_id="cleanup_once_dags", python_callable=cleanup_once_dags_fn, provide_context=True, dag=dag
)

在此处输入图像描述

在此处输入图像描述

4

0 回答 0