0

我有一个任务,我希望它每天只触发一次。我的问题是时机成熟时它会被触发多次。所以每日任务运行 4 次而不是一次。我设置了许多配置来解决这个问题,包括:

'retries': 1
 catchup=False, max_active_runs=1

我还增加了退休之间的时间,认为气流可能认为任务失败/未开始,因为完成任务可能需要一些时间。

我还根据此答案将应该在该 dag 中运行的所有代码移至 utils 文件夹

但我不知道我在这里错过了什么。有人可以帮忙吗?先感谢您。

这是dag

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from utils.postgres import postgres_backup_to_s3

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=30),#getting backup and uploading to s3 might take some time
    'start_date': datetime(2021, 1, 1)
}

with DAG('postgres_backup', default_args=default_args, schedule_interval='0 19 * * * *',
         catchup=False, max_active_runs=1) as dag:
    postgres_backup_to_s3_task = PythonOperator(task_id="postgres_backup_to_s3", python_callable=postgres_backup_to_s3)
4

1 回答 1

0

如果您的目标是每天晚上 7 点(1900 年)运行一次作业,那么您的目标schedule_interval是不正确的。你想要0 19 * * *

在 Airflow 文档中,它们的示例schedule_intervals由 5 个空格分隔的元素组成:https ://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs 。

于 2021-11-09T20:05:27.873 回答