我有一个任务,我希望它每天只触发一次。我的问题是时机成熟时它会被触发多次。所以每日任务运行 4 次而不是一次。我设置了许多配置来解决这个问题,包括:
'retries': 1
catchup=False, max_active_runs=1
我还增加了退休之间的时间,认为气流可能认为任务失败/未开始,因为完成任务可能需要一些时间。
我还根据此答案将应该在该 dag 中运行的所有代码移至 utils 文件夹
但我不知道我在这里错过了什么。有人可以帮忙吗?先感谢您。
这是dag
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from utils.postgres import postgres_backup_to_s3
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=30),#getting backup and uploading to s3 might take some time
'start_date': datetime(2021, 1, 1)
}
with DAG('postgres_backup', default_args=default_args, schedule_interval='0 19 * * * *',
catchup=False, max_active_runs=1) as dag:
postgres_backup_to_s3_task = PythonOperator(task_id="postgres_backup_to_s3", python_callable=postgres_backup_to_s3)