1

现在,我使用这样的变量创建多个任务,它工作正常。

with DAG(....) as dag:
    body = Variable.get("config_table", deserialize_json=True)
    for i in range(len(body.keys())):
        simple_task = Operator(
            task_id = 'task_' + str(i),
            .....

但是出于某种原因我需要使用 XCOM 值而不是使用变量。是否可以动态创建具有 XCOM 拉值的任务?

我尝试像这样设置值但它不起作用

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

4

2 回答 2

2

可以从以前的任务生成的任务中动态创建任务XComs,关于这个主题有更广泛的讨论,例如在这个问题中。建议的方法之一遵循这种结构,这是我制作的一个工作示例:

示例文件.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}
  • 从 API 或文件或任何来源获取数据。将其推为XCom.

def _process_obtained_data(ti):
    list_of_cities = ti.xcom_pull(task_ids='get_data')
    Variable.set(key='list_of_cities',
                 value=list_of_cities['cities'], serialize_json=True)

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        # push to XCom using return
        return data


with DAG('dynamic_tasks_example', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    get_data = PythonOperator(
        task_id='get_data',
        python_callable=_read_file)
  • 添加第二个任务,该任务将从 pull from 中提取,并使用您稍后将用于迭代的数据XCom设置 a 。Variable
    preparation_task = PythonOperator(
        task_id='preparation_task',
        python_callable=_process_obtained_data)

*当然,如果您愿意,您可以将两个任务合并为一个。我不喜欢这样做,因为通常我会使用获取的数据的一个子集来创建Variable.

  • 从中读取,Variable然后对其进行迭代。定义default_var. _ _
    end = DummyOperator(
        task_id='end',
        trigger_rule='none_failed')

    # Top-level code within DAG block
    iterable_list = Variable.get('list_of_cities',
                                 default_var=['default_city'],
                                 deserialize_json=True)
  • 在循环中声明动态任务及其依赖关系。做task_id独一无二的。TaskGroup是可选的,帮助您对 UI 进行排序。

    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:
        if iterable_list:
            for index, city in enumerate(iterable_list):
                say_hello = PythonOperator(
                    task_id=f'say_hello_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Hello'}
                )
                say_goodbye = PythonOperator(
                    task_id=f'say_goodbye_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
                )

                # TaskGroup level dependencies
                say_hello >> say_goodbye

# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end

DAG 图形视图:

用户界面中的 DAG

进口:

import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

要记住的事情:

  • 如果您同时有相同的 dag_runs,DAG它们都将使用相同的变量,因此您可能需要通过区分它们的名称来使其“唯一”。
  • 读取时必须设置默认值Variable,否则第一次执行可能无法处理到Scheduler.
  • Airflow Graph View UI 可能不会立即刷新更改。特别是在从创建动态任务生成的迭代中添加或删除项目之后的第一次运行中。
  • 如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(本文中的示例)。

祝你好运!

编辑:

另一个需要考虑的重要点:

  • 使用这种方法,对Variable.get()方法的调用是顶级代码,因此调度程序每 30 秒读取一次(默认min_file_process_interval设置)。这意味着每次都会发生与元数据数据库的连接。

编辑:

  • 添加了 if 子句来处理空的iterable_list情况。
于 2021-04-01T15:55:31.167 回答
1

这是不可能的,通常不建议使用动态任务:

  1. Airflow 调度器的工作方式是通过读取 dag 文件,将任务加载到内存中,然后检查需要调度哪些 dag 和哪些任务,而 xcom 是与特定 dag 运行相关的运行时值,因此调度器不能中继 xcom 值。
  2. 使用动态任务时,您自己的调试工作会变得更加困难,因为您用于创建 dag 的值可能会发生变化,您甚至会在不了解原因的情况下无法访问日志。

您可以做的是使用分支运算符,始终执行这些任务并根据 xcom 值跳过它们。例如:

def branch_func(**context)
    return f"task_{context['ti'].xcom_pull(key=key)}"


branch = BranchPythonOperator(
    task_id="branch",
    python_callback=branch_func
)

tasks = [BaseOperator(task_id=f"task_{i}") for i in range(3)]
branch >> tasks

在某些情况下,使用这种方法也不好(例如,当我有 100 个可能的任务时),在这些情况下,我建议编写自己的运算符或使用单个 PythonOperator。

于 2021-03-26T17:37:58.820 回答