我有一个 DAG,它是通过向 DynamoDB 查询列表来创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。未在下面的示例中显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用set_upstream
它来强制执行依赖关系。
- airflow_home
\- dags
\- workflow.py
工作流.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
问题是workflow.py
一遍又一遍地运行(每次任务运行时?),我的get_task_list()
方法被 AWS 限制并抛出异常。
我认为这是因为无论何时run_task()
调用它都会运行所有全局变量,workflow.py
所以我尝试移动run_task()
到一个单独的模块中,如下所示:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
但这并没有改变什么。我什至尝试放入get_task_list()
一个用工厂函数包装的 SubDagOperator,它的行为仍然相同。
我的问题与这些问题有关吗?
此外,为什么workflow.py
如此频繁地运行以及为什么get_task_list()
当任务方法不引用workflow.py
并且不依赖于它时导致单个任务失败时引发的错误?
最重要的是,并行处理列表并强制列表中项目之间的任何依赖关系的最佳方法是什么?