我有如下的 dag 文件。在这里我没有重试。但是,我想确保特定文件(bash1、bash2)应该重试 1。但不是其他文件。
以下是默认参数。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 24, 19, 00),
'email': ['myemail@email.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'backfill': False,
}
我将 dag 定义如下: dag = DAG('x', default_args=default_args, schedule_interval = "15 0,1,2,3,13,14,15,16,17,18,19,20,21,22, 23 * * *")
我的第一个运算符定义如下:
bash1 = BashOperator(
task_id= 'bash1',
bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash1.R ",
dag=dag
)
我的第二个运算符定义如下:
bash2 = BashOperator(
task_id= 'bash2',
bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript bash2.R ",
dag=dag
)
我的最终运算符定义如下:
Test_join = BashOperator(
task_id= 'Test_join',
bash_command="cd /home/ubuntu/run_scripts/Test/inst/scripts && /usr/bin/Rscript Test_join.R ",
dag=dag
)
Test_join 取决于 bash1 和 bash 2。
Test_join.set_upstream(bash1)
Test_join.set_upstream(bash2)
我应该怎么做才能确保 bash1 和 bash2 退休 2 但不是 Test_join。