3

我想在 DAG A完成执行时清除 DAG B 中的任务。A 和 B 都是已调度的DAG。

是否有任何operator/方式来清除任务状态并以编程方式重新运行 DAG B?


我知道用于清除任务的CLI 选项和 Web UI 选项。

4

4 回答 4

3

我建议在这里远离 CLI!

与通过 BashOperator 和/或 CLI 模块相比,在引用对象时,dags/tasks 的气流功能会更好地暴露出来。

将 python 操作添加到名为“clear_dag_b”的dag A ,它从dags文件夹(模块)导入 dag_b,并且:

from dags.dag_b import dag as dag_b

def clear_dag_b(**context):
   exec_date = context[some date object, I forget the name]
   dag_b.clear(start_date=exec_date, end_date=exec_date) 

重要的!如果您出于某种原因不匹配重叠dag_b 调度时间与 start_date/end_date,则 clear() 操作将错过 dag 执行。此示例假定 dag AB的计划相同,并且您只想在A执行第X天时从B清除第X天

在清除之前检查 dag_b 是否已经运行可能是有意义的:

dab_b_run = dag_b.get_dagrun(exec_date) # returns None or a dag_run object
于 2019-12-09T15:00:02.557 回答
2
@cli_utils.action_logging
def clear(args):
    logging.basicConfig(
        level=settings.LOGGING_LEVEL,
        format=settings.SIMPLE_LOG_FORMAT)
    dags = get_dags(args)

    if args.task_regex:
        for idx, dag in enumerate(dags):
            dags[idx] = dag.sub_dag(
                task_regex=args.task_regex,
                include_downstream=args.downstream,
                include_upstream=args.upstream)

    DAG.clear_dags(
        dags,
        start_date=args.start_date,
        end_date=args.end_date,
        only_failed=args.only_failed,
        only_running=args.only_running,
        confirm_prompt=not args.no_confirm,
        include_subdags=not args.exclude_subdags,
        include_parentdag=not args.exclude_parentdag,
    )
  • 查看源代码,您可以
    • 复制它(假设您还想稍微修改一下功能)
    • 或者也许只是from airflow.bin import cli直接调用所需的函数
于 2019-10-01T09:24:42.993 回答
2

由于我的目标是在 DAG A 完成执行时重新运行 DAG B,因此我最终使用 BashOperator 清除了 DAG B:

# Clear the tasks in another dag
last_task = BashOperator(
    task_id='last_task',
    bash_command= 'airflow clear example_target_dag -c ',
    dag=dag)

first_task >> last_task
于 2019-11-11T03:46:00.543 回答
1

这是可能的,但如果任务永远不会成功,我会小心进入无限循环的重试。您可以在 on_retry_callback 中调用 bash 命令,您可以在其中指定要清除的任务/dag 运行。

这在 2.0 中有效,因为清除命令已更改

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clear

在此示例中,当 t3 最终失败时,我将从 t2 和下游任务中清除:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


def clear_upstream_task(context):
    execution_date = context.get("execution_date")
    clear_tasks = BashOperator(
        task_id='clear_tasks',
        bash_command=f'airflow tasks clear -s {execution_date}  -t t2 -d -y clear_upstream_task'
    )
    return clear_tasks.execute(context=context)


# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}


with DAG('clear_upstream_task',
         start_date=datetime(2021, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=5),
         default_args=default_args,
         catchup=False
         ) as dag:
    t0 = DummyOperator(
        task_id='t0'
    )

    t1 = DummyOperator(
        task_id='t1'
    )

    t2 = DummyOperator(
        task_id='t2'
    )
    t3 = BashOperator(
        task_id='t3',
        bash_command='exit 123',
        #retries=1,
        on_failure_callback=clear_upstream_task
    )

    t0 >> t1 >> t2 >> t3
于 2021-08-19T14:03:59.477 回答