4

GCS/dags我想从 Airflow UI 中删除一个 DAG,该 DAG 在文件夹中不再可用。我知道 Airflow 有一种“新”方法可以使用 airflow delete_dag my_dag_id命令从数据库中删除 dag,见https://stackoverflow.com/a/49683543/5318634

似乎在作曲家气流版本中,该delete_dag命令尚不受支持。

不要尝试这个: 我也尝试过使用airflow resetdb并且气流 UI 死了

有没有办法删除当前不在gs://BUCKET/dags/文件夹中的 dag?

4

3 回答 3

8

我创建了一个 DAG 来清理 UI。afDagID它从气流变量中读取

from airflow import DAG
from airflow import models
from airflow.operators.mysql_operator import MySqlOperator
import logging
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('ManageAirFlow', description='Deletes Airflow DAGs from backend: Uses vars-  afDagID',
      schedule_interval=None,
      start_date=datetime(2018, 3, 20), catchup=False)

DeleteXComOperator = MySqlOperator(
  task_id='delete-xcom-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from xcom where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteTaskOperator = MySqlOperator(
  task_id='delete-task-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from task_instance where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteSLAMissOperator = MySqlOperator(
  task_id='delete-sla-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from sla_miss where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteLogOperator = MySqlOperator(
  task_id='delete-log-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from log where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteJobOperator = MySqlOperator(
  task_id='delete-job-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from job where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagRunOperator = MySqlOperator(
  task_id='delete-dag_run-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag_run where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)

DeleteDagOperator = MySqlOperator(
  task_id='delete-dag-record-task',
  mysql_conn_id='airflow_db',
  sql="DELETE from dag where dag_id='{}'".format(models.Variable.get('afDagID')),
  dag=dag)



DeleteXComOperator >> DeleteTaskOperator >> DeleteSLAMissOperator >> DeleteLogOperator >> DeleteJobOperator >> DeleteDagRunOperator >> DeleteDagOperator
于 2018-06-15T15:08:41.877 回答
2

由于 cloud composer 使用的是最新的稳定版本 ie 1.9.0,所以不提供删除 dag 的功能。

然而,

文档中几乎没有删除 dag 的说明,如下所示:

 gcloud beta composer environments storage dags delete \
     --environment ENVIRONMENT_NAME \
     --location LOCATION \
     DAG_NAME.py 

但不幸的是,这不会从 Airflow Web 界面中删除 DAG。

更多信息:https ://cloud.google.com/composer/docs/how-to/using/managing-dags#deleting_a_dag

于 2018-05-31T21:13:04.797 回答
0

它将分两步进行:- step1:-

首先,您必须使用存储桶中的命令删除airflow_monitoring.py文件。

gcloud composer 环境存储 dags 删除 --environment viu-etl-prod-composer --location us-central1 airflow_monitoring.py

第 2 步:-
点击如图所示的红叉检查。

在此处输入图像描述

于 2020-04-25T08:00:05.743 回答