在 Airflow 2 任务流 API 中,我可以使用以下代码示例轻松地在任务之间推送和拉取 XCom 值:-
@task(task_id="task_one")
def get_height() -> int:
response = requests.get("https://swapi.dev/api/people/4")
data = json.loads(response.text)
height = int(data["height"])
return height
@task(task_id="task_two")
def check_height(val):
# Show val:
print(f"Value passed in is: {val}")
check_height(get_height())
我可以看到传递给 check_height 的 val 是 202,并且包含在 xcom 默认键“return_value”中,这在某些时候很好,但我通常更喜欢使用特定的键。
我的问题是如何使用命名键推动 XCom?这在以前使用 ti.xcom_push 非常简单,您可以在其中提供您希望将值填充到其中的键名,但我不能完全说明如何在任务流 api 工作流中实现这一点。
将不胜感激任何指针或(简单,请!)如何做到这一点的例子。