我有一个 dag,我们将部署到多个不同的气流实例,并且在 airflow.cfg 中我们有dags_are_paused_at_creation = True
,但是对于这个特定的 dag,我们希望无需通过单击 UI 手动打开它。有没有办法以编程方式做到这一点?
6 回答
如果其他人遇到此问题,我创建了以下函数:
import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
"""
A way to programatically unpause a DAG.
:param dag: DAG object
:return: dag.is_paused is now False
"""
session = airflow.settings.Session()
try:
qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
d = qry.first()
d.is_paused = False
session.commit()
except:
session.rollback()
finally:
session.close()
提供您的 dag_id 并在命令行上运行此命令。
airflow pause dag_id.
有关气流命令行界面的更多信息:https ://airflow.incubator.apache.org/cli.html
airflow-rest-api-plugin 插件也可用于以编程方式暂停任务。
暂停 DAG
可用于 Airflow 版本:1.7.0 或更高版本
GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause
查询参数:
dag_id - 字符串 - dag 的 id
subdir (optional) - string - 要从中查找 dag 的文件位置或目录
例子:
http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id
有关更多详细信息,请参阅: https ://github.com/teamclairvoyant/airflow-rest-api-plugin
我想你正在寻找unpause
(不是pause
)
airflow unpause DAG_ID
以下 cli 命令应该根据最近的文档工作。
airflow dags unpause dag_id
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause
Airflow 的 REST API 提供了一种使用 DAG 补丁 API 的方法:我们需要使用查询参数更新 dag,?update_mask=is_paused
并将布尔值作为请求正文发送。
参考:https ://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag