我想在 DAG A完成执行时清除 DAG B 中的任务。A 和 B 都是已调度的DAG。
是否有任何operator/方式来清除任务状态并以编程方式重新运行 DAG B?
我知道用于清除任务的CLI 选项和 Web UI 选项。
我想在 DAG A完成执行时清除 DAG B 中的任务。A 和 B 都是已调度的DAG。
是否有任何operator/方式来清除任务状态并以编程方式重新运行 DAG B?
我知道用于清除任务的CLI 选项和 Web UI 选项。
我建议在这里远离 CLI!
与通过 BashOperator 和/或 CLI 模块相比,在引用对象时,dags/tasks 的气流功能会更好地暴露出来。
将 python 操作添加到名为“clear_dag_b”的dag A ,它从dags文件夹(模块)导入 dag_b,并且:
from dags.dag_b import dag as dag_b
def clear_dag_b(**context):
exec_date = context[some date object, I forget the name]
dag_b.clear(start_date=exec_date, end_date=exec_date)
重要的!如果您出于某种原因不匹配或重叠dag_b 调度时间与 start_date/end_date,则 clear() 操作将错过 dag 执行。此示例假定 dag A和B的计划相同,并且您只想在A执行第X天时从B清除第X天
在清除之前检查 dag_b 是否已经运行可能是有意义的:
dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object
@cli_utils.action_logging def clear(args): logging.basicConfig( level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT) dags = get_dags(args) if args.task_regex: for idx, dag in enumerate(dags): dags[idx] = dag.sub_dag( task_regex=args.task_regex, include_downstream=args.downstream, include_upstream=args.upstream) DAG.clear_dags( dags, start_date=args.start_date, end_date=args.end_date, only_failed=args.only_failed, only_running=args.only_running, confirm_prompt=not args.no_confirm, include_subdags=not args.exclude_subdags, include_parentdag=not args.exclude_parentdag, )
from airflow.bin import cli直接调用所需的函数由于我的目标是在 DAG A 完成执行时重新运行 DAG B,因此我最终使用 BashOperator 清除了 DAG B:
# Clear the tasks in another dag
last_task = BashOperator(
task_id='last_task',
bash_command= 'airflow clear example_target_dag -c ',
dag=dag)
first_task >> last_task
这是可能的,但如果任务永远不会成功,我会小心进入无限循环的重试。您可以在 on_retry_callback 中调用 bash 命令,您可以在其中指定要清除的任务/dag 运行。
这在 2.0 中有效,因为清除命令已更改
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clear
在此示例中,当 t3 最终失败时,我将从 t2 和下游任务中清除:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t2 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
#retries=1,
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3