我很难理解 Airflow 中的 BranchPythonOperator 是如何工作的。我知道它主要用于分支,但是文档对传递给任务的内容以及我需要从上游任务传递/期望的内容感到困惑。
鉴于本页run_this_first
文档中的简单示例,上游任务调用的源代码和分支的 2 个下游任务的源代码是什么样的?Airflow 究竟是如何知道运行branch_a
而不是运行的branch_b
?上游任务的输出在哪里得到注意/读取?
我很难理解 Airflow 中的 BranchPythonOperator 是如何工作的。我知道它主要用于分支,但是文档对传递给任务的内容以及我需要从上游任务传递/期望的内容感到困惑。
鉴于本页run_this_first
文档中的简单示例,上游任务调用的源代码和分支的 2 个下游任务的源代码是什么样的?Airflow 究竟是如何知道运行branch_a
而不是运行的branch_b
?上游任务的输出在哪里得到注意/读取?
您的 BranchPythonOperator 是用一个创建的python_callable
,这将是一个函数。该函数应根据您的业务逻辑返回您已连接的直接下游任务的任务名称。这可能是下游的 1 到 N 个任务。下游任务无需读取任何内容,但是您可以使用 xcom 将元数据传递给它们。
def decide_which_path():
if something is True:
return "branch_a"
else:
return "branch_b"
branch_task = BranchPythonOperator(
task_id='run_this_first',
python_callable=decide_which_path,
trigger_rule="all_done",
dag=dag)
branch_task.set_downstream(branch_a)
branch_task.set_downstream(branch_b)
重要的是设置trigger_rule
将跳过其余部分或全部,默认为all_success
.