我主要看到 Airflow 用于 ETL/投标数据相关的工作。我正在尝试将它用于业务工作流程,其中用户操作将来会触发一组相关任务。其中一些任务可能需要根据某些其他用户操作来清除(删除)。我认为处理此问题的最佳方法是通过动态任务 ID。我读到 Airflow 支持动态 dag id。因此,我创建了一个简单的 python 脚本,它将 DAG id 和任务 id 作为命令行参数。但是,我遇到了使它工作的问题。它给出了 dag_id not found 错误。有没有人试过这个?这是我在命令行上作为 python (python tmp.py 820 2016-08-24T22:50:00) 执行的脚本代码(称为 tmp.py):
from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2 :
dagid = sys.argv[1]
taskid = 'Activate' + sys.argv[1]
execution = sys.argv[2]
else:
dagid = 'DAGObjectId'
taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
default_args=default_args,
schedule_interval='@once',
)
globals()[dagid] = dag
task1 = BashOperator(
task_id = taskid,
bash_command='ls -l',
dag=dag)
fakeTask = BashOperator(
task_id = 'fakeTask',
bash_command='sleep 5',
retries = 3,
dag=dag)
task1.set_upstream(fakeTask)
airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)