15

我有一个 dag,我们将部署到多个不同的气流实例,并且在 airflow.cfg 中我们有dags_are_paused_at_creation = True,但是对于这个特定的 dag,我们希望无需通过单击 UI 手动打开它。有没有办法以编程方式做到这一点?

4

6 回答 6

27

如果其他人遇到此问题,我创建了以下函数:

import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
    """
    A way to programatically unpause a DAG.
    :param dag: DAG object
    :return: dag.is_paused is now False
    """
    session = airflow.settings.Session()
    try:
        qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
        d = qry.first()
        d.is_paused = False
        session.commit()
    except:
        session.rollback()
    finally:
        session.close()
于 2017-06-05T02:44:29.093 回答
8

提供您的 dag_id 并在命令行上运行此命令。

airflow pause dag_id.

有关气流命令行界面的更多信息:https ://airflow.incubator.apache.org/cli.html

于 2017-06-05T08:19:27.033 回答
8

airflow-rest-api-plugin 插件也可用于以编程方式暂停任务。

暂停 DAG

可用于 Airflow 版本:1.7.0 或更高版本

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

查询参数:

dag_id - 字符串 - dag 的 id

subdir (optional) - string - 要从中查找 dag 的文件位置或目录

例子:

http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

有关更多详细信息,请参阅: https ://github.com/teamclairvoyant/airflow-rest-api-plugin

于 2018-01-01T00:31:36.357 回答
2

我想你正在寻找unpause(不是pause

airflow unpause DAG_ID
于 2019-05-03T12:08:25.000 回答
0

以下 cli 命令应该根据最近的文档工作。

airflow dags unpause dag_id

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause

于 2021-07-04T14:22:18.480 回答
0

Airflow 的 REST API 提供了一种使用 DAG 补丁 API 的方法:我们需要使用查询参数更新 dag,?update_mask=is_paused并将布尔值作为请求正文发送。

参考:https ://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag

于 2021-12-17T07:04:21.450 回答