0

我有如下的 dag 文件。在这里我没有重试。但是,我想确保特定文件(bash1、bash2)应该重试 1。但不是其他文件。

以下是默认参数。

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 3, 24, 19, 00),
    'email': ['myemail@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'backfill': False,
}

我将 dag 定义如下: dag = DAG('x', default_args=default_args, schedule_interval = "15 0,1,2,3,13,14,15,16,17,18,19,20,21,22, 23 * * *")

我的第一个运算符定义如下:

bash1 = BashOperator(
        task_id= 'bash1',
        bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash1.R ",
        dag=dag
    )

我的第二个运算符定义如下:

bash2 = BashOperator(
        task_id= 'bash2',
        bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash2.R ",
        dag=dag
    )

我的最终运算符定义如下:

Test_join = BashOperator(
        task_id= 'Test_join',
        bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript Test_join.R ",
        dag=dag
    )

Test_join 取决于 bash1 和 bash 2。

Test_join.set_upstream(bash1)
Test_join.set_upstream(bash2)

我应该怎么做才能确保 bash1 和 bash2 退休 2 但不是 Test_join。

4

1 回答 1

0

您可以在声明任务时使用参数“重试”。

于 2017-06-14T12:01:09.077 回答