2

我有一个 DAG,它是通过向 DynamoDB 查询列表来创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。未在下面的示例中显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用set_upstream它来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py

工作流.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法被 AWS 限制并抛出异常。

我认为这是因为无论何时run_task()调用它都会运行所有全局变量,workflow.py所以我尝试移动run_task()到一个单独的模块中,如下所示:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py

但这并没有改变什么。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为仍然相同。

我的问题与这些问题有关吗?

此外,为什么workflow.py如此频繁地运行以及为什么get_task_list()当任务方法不引用workflow.py并且不依赖于它时导致单个任务失败时引发的错误?

最重要的是,并行处理列表并强制列表中项目之间的任何依赖关系的最佳方法是什么?

4

1 回答 1

5

根据您引用的问题,气流不支持在 dag 运行时创建任务。

因此,气流会在开始运行之前定期生成完整的 DAG 定义。理想情况下,这种生成的周期应该与该 DAG 的调度间隔相同。

但是可能每次气流检查 dag 的变化时,它也在生成完整的 dag,导致请求过多。该时间使用气流.cfg 中的配置 min_file_process_interval 和 dag_dir_list_interval 控制。

关于任务失败,它们失败是因为 dag 创建本身失败并且气流无法启动它们。

于 2017-07-15T15:59:50.290 回答