您是否考虑过在可调用对象周围使用装饰器/高阶函数?
我正在考虑使用以下内容:
def conf_task_id_skip(python_callable):
def skip_if_configured(*args, **context):
task_id = context["task_id"]
dag_run = context["dag_run"]
skip_task_ids = dag_run.conf.get("skip_task_ids", [])
if skip_task_ids and task_id in skip_task_ids:
return None
else:
return python_callable(*args, **context)
return skip_if_configured
PythonOperator(
task_id="task_id",
python_callable=conf_task_id_skip(task_callable)
)
然后,如果我愿意,我可以手动传递我想跳过的任务(并且仍然成功)。
如果您愿意,您还可以通过添加检查是否不允许跳过来增加稳健性(例如在 prod 中):
def conf_task_id_skip(python_callable):
def skip_if_configured(*args, **context):
if Variable.get("disallow_conf_task_id_skip"):
return python_callable(*args, **context)
task_id = context["task_id"]
dag_run = context["dag_run"]
skip_task_ids = dag_run.conf.get("skip_task_ids", [])
if skip_task_ids and task_id in skip_task_ids:
return None
else:
return python_callable(*args, **context)
return skip_if_configured