3

我们有一个巨大的 DAG,有许多小而快的任务和一些大而耗时的任务。

我们只想运行 DAG 的一部分,而我们发现的最简单的方法是不添加我们不想运行的任务。问题是我们的 DAG 有很多共同依赖,所以当我们想要跳过一些任务时,不破坏 dag 成为一个真正的挑战。

有没有办法默认为任务添加状态?(对于每次运行),例如:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

或者

for task in task_list:
    task.status = 'success'
4

2 回答 2

2

如评论中所述,您应该使用BranchPythonOperator( 或ShortCircuitOperator) 来防止执行耗时的任务。如果您需要运行这些耗时任务的下游算子,您可以使用TriggerRule.ALL_DONE让这些算子运行,但请注意,即使上游算子失败,它也会运行。

您可以使用气流变量来影响这些BranchPythonOperators,而无需更新 DAG,例如:

from airflow.models import Variable

def branch_python_operator_callable()
  return Variable.get('time_consuming_operator_var')

branch_python_operator_callable用作 BranchPythonOperator 的 Python 可调用对象。

于 2018-06-19T22:26:27.473 回答
0

您是否考虑过在可调用对象周围使用装饰器/高阶函数?

我正在考虑使用以下内容:

def conf_task_id_skip(python_callable):
    def skip_if_configured(*args, **context):
        task_id = context["task_id"]
        dag_run = context["dag_run"]
        skip_task_ids = dag_run.conf.get("skip_task_ids", [])

        if skip_task_ids and task_id in skip_task_ids:
            return None
        else:
            return python_callable(*args, **context)
    
    return skip_if_configured
PythonOperator(
    task_id="task_id", 
    python_callable=conf_task_id_skip(task_callable)
)

然后,如果我愿意,我可以手动传递我想跳过的任务(并且仍然成功)。

如果您愿意,您还可以通过添加检查是否不允许跳过来增加稳健性(例如在 prod 中):

def conf_task_id_skip(python_callable):
    def skip_if_configured(*args, **context):
        if Variable.get("disallow_conf_task_id_skip"):
            return python_callable(*args, **context)

        task_id = context["task_id"]
        dag_run = context["dag_run"]
        skip_task_ids = dag_run.conf.get("skip_task_ids", [])

        if skip_task_ids and task_id in skip_task_ids:
            return None
        else:
            return python_callable(*args, **context)
    
    return skip_if_configured
于 2021-07-05T22:10:51.303 回答