1

在 Airflow 2 任务流 API 中,我可以使用以下代码示例轻松地在任务之间推送和拉取 XCom 值:-

@task(task_id="task_one")
    def get_height() -> int:
        response = requests.get("https://swapi.dev/api/people/4")
        data = json.loads(response.text)
        height = int(data["height"])
        return height

    @task(task_id="task_two")
    def check_height(val):
        # Show val:
        print(f"Value passed in is: {val}")

    check_height(get_height())

我可以看到传递给 check_height 的 val 是 202,并且包含在 xcom 默认键“return_value”中,这在某些时候很好,但我通常更喜欢使用特定的键。

我的问题是如何使用命名键推动 XCom?这在以前使用 ti.xcom_push 非常简单,您可以在其中提供您希望将值填充到其中的键名,但我不能完全说明如何在任务流 api 工作流中实现这一点。

将不胜感激任何指针或(简单,请!)如何做到这一点的例子。

4

1 回答 1

1

您可以ti在装饰器中设置为:

@task(task_id="task_one", ti)
def get_height() -> int:
    response = requests.get("https://swapi.dev/api/people/4")
    data = json.loads(response.text)
    height = int(data["height"])

    # Handle named Xcom
    ti.xcom_push("my_key", height)

对于需要深度函数上下文的情况,您也可以使用get_current_context. 我将在下面的示例中使用它来展示它,但在您的情况下它并不是真正需要的。

这是一个工作示例:

import json
from datetime import datetime

import requests

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

DEFAULT_ARGS = {"owner": "airflow"}


@dag(dag_id="stackoverflow_dag", default_args=DEFAULT_ARGS, schedule_interval=None, start_date=datetime(2020, 2, 2))
def my_dag():

    @task(task_id="task_one")
    def get_height() -> int:
        response = requests.get("https://swapi.dev/api/people/4")
        data = json.loads(response.text)
        height = int(data["height"])

        # Handle named Xcom
        context = get_current_context()
        ti = context["ti"]
        ti.xcom_push("my_key", height)

        return height

    @task(task_id="task_two")
    def check_height(val):
        # Show val:
        print(f"Value passed in is: {val}")

        #Read from named Xcom
        context = get_current_context()
        ti = context["ti"]
        ti.xcom_pull("task_one")
        print(f"Value passed from xcom my_key is: {val}")

    check_height(get_height())

my_dag = my_dag()

两个 xcom 被推送(一个用于返回值,一个用于我们选择的键): 在此处输入图像描述

在下游打印两个 xcom task_two

在此处输入图像描述

于 2021-10-03T11:37:05.193 回答