使用 apache 气流,我创建了一些 DAGS,其中一些不按计划运行。
我正在尝试找到一种方法,可以从 Python 脚本中触发特定 DAG 的运行。这可能吗?我能怎么做?
编辑 --- python 脚本将从与我所有 DAGS 所在的项目不同的项目中运行
使用 apache 气流,我创建了一些 DAGS,其中一些不按计划运行。
我正在尝试找到一种方法,可以从 Python 脚本中触发特定 DAG 的运行。这可能吗?我能怎么做?
编辑 --- python 脚本将从与我所有 DAGS 所在的项目不同的项目中运行
在触发 Airflow DAG 运行时,您有多种选择。
气流 python 包提供了一个本地客户端,可用于从 python 脚本中触发 dag。例如:
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='test_dag_id', run_id='test_run_id', conf={})
您可以使用 Airflow CLI 手动触发气流中的 dag。可以在此处找到有关如何使用 CLI 触发 DAG 的更多信息。
您还可以使用 Airflow REST api 来触发 DAG 运行。更多信息在这里。
python 中的第一个选项可能最适合您(这也是我过去亲自完成的方式)。但理论上,您可以使用子进程从 python 与 CLI 进行交互,或者使用类似于请求的库从 Python 中与 REST API 进行交互。
在 AWS MWAA Airflow 1.10.12 上,我使用基于boto3Python 和 REST POST 请求的库的解决方案:
import boto3
import requests
def TriggerAirflowDAG(mwaa_environment, dag_id):
client = boto3.client("mwaa")
token = client.create_cli_token(Name=mwaa_environment)
url = "https://{0}/aws_mwaa/cli".format(token["WebServerHostname"])
body = f"trigger_dag {dag_id}"
headers = {
"Authorization": "Bearer " + token["CliToken"],
"Content-Type": "text/plain"
}
return requests.post(url, data=body, headers=headers)
启动 DAG 运行的用户/角色必须具有AmazonMWAAAirflowCliAccess策略。