我正在使用气流 2.2.0。我试图优先考虑一次运行以完成同一个 dag 的多次运行。理想情况下,我希望 dag 的所有任务都能完全完成,而不是并行运行相同的任务。
原则上,使用priority_weight
它应该可以工作。但是,一旦我为每个任务使用不同的池,这将不再起作用。
# import random
from airflow.models.pool import Pool
from airflow.operators.python_operator import PythonOperator
from utils.utils import generate_dag
from time import sleep
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def sleep_and_print(n):
for i in range(n):
LOGGER.info(f"Sleeping for {i} seconds")
sleep(1)
def task1(*args, **kwargs):
LOGGER.info("task1 started")
sleep_and_print(10)
LOGGER.info("task1 finished")
# if random.randint(0, 1):
# raise Exception("task1 failed")
def task2(*args, **kwargs):
LOGGER.info("task2 started")
sleep_and_print(2)
LOGGER.info("task2 finished")
def task3(*args, **kwargs):
LOGGER.info("task3 started")
sleep_and_print(10)
LOGGER.info("task3 finished")
def task4(*args, **kwargs):
LOGGER.info("task4 started")
sleep_and_print(15)
LOGGER.info("task4 finished")
def task5(*args, **kwargs):
LOGGER.info("task5 started")
sleep_and_print(10)
LOGGER.info("task5 finished")
def op_task(dag, task_id, task_func, pool, wait='absolute'):
return PythonOperator(
task_id=task_id,
python_callable=task_func,
weight_rule=wait,
pool=Pool.get_pool(pool),
dag=dag
)
dag = generate_dag('test_prio')
task_1 = op_task(dag, 'task_1', task1, wait='upstream', pool='test_prio')
# task_1 = op_task(dag, 'task_1', task1)
task_2 = op_task(dag, 'task_2', task2, wait='upstream', pool='test_prio2')
task_3 = op_task(dag, 'task_3', task3, wait='upstream', pool='test_prio3')
task_4 = op_task(dag, 'task_4', task4, wait='upstream', pool='test_prio4')
task_5 = op_task(dag, 'task_5', task5, wait='upstream', pool='test_prio5')
task_1 >> task_2 >> task_3
task_1 >> task_4
task_1 >> task_5