我想写下气流 dags cleanup dag。我' https://github.com/teamclairvoyant/airflow-maintenance-dags供参考。我编写了一个清理 DAG,它获取所有 dag 并检查它们的时间表,如果是一次,它会删除 DAG。当我尝试运行清理 dag 时,它会抛出 sqlchemy 错误(已附加屏幕截图)。请帮我解决这个错误。
from airflow.operators.python_operator import PythonOperator
from airflow import settings
from datetime import timedelta
import os
import os.path
import socket
import logging
import airflow
DAG_ID = "AIRFLOW_CLEANUP_DAG"
START_DATE = airflow.utils.dates.days_ago(1)
SCHEDULE_INTERVAL = "@once"
DAG_OWNER_NAME = "PDM"
ALERT_EMAIL_ADDRESSES = []
ENABLE_DELETE = True
default_args = {
"owner": DAG_OWNER_NAME,
"depends_on_past": False,
"email": ALERT_EMAIL_ADDRESSES,
"email_on_failure": True,
"email_on_retry": False,
"start_date": START_DATE,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
catchup=False,
description="Performing airflow cleanup",
)
if hasattr(dag, "doc_md"):
dag.doc_md = __doc__
if hasattr(dag, "catchup"):
dag.catchup = False
def cleanup_once_dags_fn(**context):
logging.info("Starting to run cleanup Process")
try:
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
logging.info("Running on Machine with Host Name: " + host_name)
logging.info("Running on Machine with IP: " + host_ip)
except Exception as e:
print("Unable to get Host Name and IP: " + str(e))
session = settings.Session()
logging.info("Configurations:")
logging.info("enable_delete: " + str(ENABLE_DELETE))
logging.info("session: " + str(session))
logging.info("")
dags = session.query(DagModel).all()
entries_to_delete = []
logging.info(f"Total dags :: {len(dags)}")
for dag in dags:
dag_schedule = dag.schedule_interval
if dag_schedule == "@once" and dag.dag_id=='ba54206d-078c-42e8-a6b5-ad579e833364':
entries_to_delete.append(dag)
logging.info(f"Dags with once schedule {len(entries_to_delete)}")
logging.info("Process will be Deleting the DAG(s) from the DB:")
logging.info("Process will be Deleting " + str(len(entries_to_delete)) + " DAG(s)")
if ENABLE_DELETE:
logging.info("Performing Delete...")
for entry in entries_to_delete:
session.delete(entry)
session.commit()
logging.info("Finished Performing Delete")
else:
logging.warn("You're opted to skip deleting the DAG entries!!!")
logging.info("Finished Running Clear Process")
cleanup_once_dags = PythonOperator(
task_id="cleanup_once_dags", python_callable=cleanup_once_dags_fn, provide_context=True, dag=dag
)