0

我正在使用气流 2.2.0。我试图优先考虑一次运行以完成同一个 dag 的多次运行。理想情况下,我希望 dag 的所有任务都能完全完成,而不是并行运行相同的任务。

原则上,使用priority_weight它应该可以工作。但是,一旦我为每个任务使用不同的池,这将不再起作用。

# import random
from airflow.models.pool import Pool
from airflow.operators.python_operator import PythonOperator

from utils.utils import generate_dag
from time import sleep


LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


def sleep_and_print(n):
    for i in range(n):
        LOGGER.info(f"Sleeping for {i} seconds")
        sleep(1)


def task1(*args, **kwargs):
    LOGGER.info("task1 started")
    sleep_and_print(10)
    LOGGER.info("task1 finished")
    # if random.randint(0, 1):
        # raise Exception("task1 failed")


def task2(*args, **kwargs):
    LOGGER.info("task2 started")
    sleep_and_print(2)
    LOGGER.info("task2 finished")


def task3(*args, **kwargs):
    LOGGER.info("task3 started")
    sleep_and_print(10)
    LOGGER.info("task3 finished")


def task4(*args, **kwargs):
    LOGGER.info("task4 started")
    sleep_and_print(15)
    LOGGER.info("task4 finished")


def task5(*args, **kwargs):
    LOGGER.info("task5 started")
    sleep_and_print(10)
    LOGGER.info("task5 finished")


def op_task(dag, task_id, task_func, pool, wait='absolute'):
    return PythonOperator(
        task_id=task_id,
        python_callable=task_func,
        weight_rule=wait,
        pool=Pool.get_pool(pool),
        dag=dag
    )


dag = generate_dag('test_prio')

task_1 = op_task(dag, 'task_1', task1, wait='upstream', pool='test_prio')
# task_1 = op_task(dag, 'task_1', task1)
task_2 = op_task(dag, 'task_2', task2, wait='upstream', pool='test_prio2')
task_3 = op_task(dag, 'task_3', task3, wait='upstream', pool='test_prio3')
task_4 = op_task(dag, 'task_4', task4, wait='upstream', pool='test_prio4') 
task_5 = op_task(dag, 'task_5', task5, wait='upstream', pool='test_prio5')

task_1 >> task_2 >> task_3
task_1 >> task_4
task_1 >> task_5
4

2 回答 2

1

我知道这在每个池中都priority_weight可以在级别上工作,但不能在全球范围内工作。pool我找不到像文档中那样明确的内容,但是Astronomer的本指南中有示例和进一步的解释。

从提到的文章:

池旨在控制任务实例的并行性。相反,如果您希望限制单个 DAG 或所有 DAG 的并发 DagRun 数量,请分别查看 max_active_runs 和 core.max_active_runs_per_dag 参数

因此,这里有一个小例子来测试max_active_runs同一个 DAG 的多个 DagRuns 的工作原理:

文件:

:param max_active_runs : 活动 DAG 运行的最大数量,超过这个运行状态的 DAG 运行数,调度程序不会创建新的活动 DAG 运行

例子:

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

args = {
    "owner": "airflow",
}

with DAG(
    dag_id="example_max_active_runs",
    default_args=args,
    schedule_interval="@once",
    start_date=datetime(2021, 11, 3),
    max_active_runs=1,
    catchup=False,
    tags=["example", ],
) as dag:

    main_task = BashOperator(
        task_id="main_task",
        bash_command='echo "waiting.." && sleep 30',
    )

    end = DummyOperator(
        task_id="end",
    )

    main_task >> end

从 UI 触发 DAG 3 次,表明由于max_active_runs=1,只有一次运行在执行中,其他运行在队列中:

第一个 DagRun:

example_dag_1st_run

第二次DagRun:

example_dag_2nd_run

于 2021-11-04T00:16:46.030 回答
0

此功能仍在 Airflow 中进行,这里的 github repo 中已经提出了一个问题:https://github.com/apache/airflow/issues/13975#issuecomment-806538498[][1]

于 2021-11-05T08:30:29.220 回答