可以从以前的任务生成的任务中动态创建任务XComs
,关于这个主题有更广泛的讨论,例如在这个问题中。建议的方法之一遵循这种结构,这是我制作的一个工作示例:
示例文件.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
- 从 API 或文件或任何来源获取数据。将其推为
XCom
.
def _process_obtained_data(ti):
list_of_cities = ti.xcom_pull(task_ids='get_data')
Variable.set(key='list_of_cities',
value=list_of_cities['cities'], serialize_json=True)
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
# push to XCom using return
return data
with DAG('dynamic_tasks_example', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
get_data = PythonOperator(
task_id='get_data',
python_callable=_read_file)
- 添加第二个任务,该任务将从 pull from 中提取,并使用您稍后将用于迭代的数据
XCom
设置 a 。Variable
preparation_task = PythonOperator(
task_id='preparation_task',
python_callable=_process_obtained_data)
*当然,如果您愿意,您可以将两个任务合并为一个。我不喜欢这样做,因为通常我会使用获取的数据的一个子集来创建Variable
.
- 从中读取,
Variable
然后对其进行迭代。定义default_var
. _ _
end = DummyOperator(
task_id='end',
trigger_rule='none_failed')
# Top-level code within DAG block
iterable_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
- 在循环中声明动态任务及其依赖关系。做
task_id
独一无二的。TaskGroup
是可选的,帮助您对 UI 进行排序。
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
if iterable_list:
for index, city in enumerate(iterable_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Hello'}
)
say_goodbye = PythonOperator(
task_id=f'say_goodbye_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
)
# TaskGroup level dependencies
say_hello >> say_goodbye
# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end
DAG 图形视图:

进口:
import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
要记住的事情:
- 如果您同时有相同的 dag_runs,
DAG
它们都将使用相同的变量,因此您可能需要通过区分它们的名称来使其“唯一”。
- 读取时必须设置默认值
Variable
,否则第一次执行可能无法处理到Scheduler
.
- Airflow Graph View UI 可能不会立即刷新更改。特别是在从创建动态任务生成的迭代中添加或删除项目之后的第一次运行中。
- 如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例)。
祝你好运!
编辑:
另一个需要考虑的重要点:
- 使用这种方法,对
Variable.get()
方法的调用是顶级代码,因此调度程序每 30 秒读取一次(默认min_file_process_interval
设置)。这意味着每次都会发生与元数据数据库的连接。
编辑:
- 添加了 if 子句来处理空的
iterable_list
情况。