6

我正在研究一个依赖于另一个 DAG 的 DAG。因此,我使用的是 ExternalTask​​Sensor。然而,在使用这个传感器时,我注意到了一些奇怪的行为;

如果将 soft_fail 参数设置为 True(如果任务失败,它会将状态设置为已跳过而不是失败),该任务将永远不会重试。虽然我希望任务重试指定的次数(通过 retries 参数)。如果将 soft_fail 参数设置为 False,它会重试。请参阅下面的最小示例 DAG。

我错过了什么吗?

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime, timedelta


dag_name = 'soft_fail_example'
schedule_interval = "0 * * * *"
default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime(2018, 1, 1),
            'email': [],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(seconds=30),
        }

test_dag = DAG(dag_name, default_args=default_args, schedule_interval=schedule_interval, 
catchup=False, max_active_runs=1)


ets = ExternalTaskSensor(task_id="test_external_task_sensor", dag=test_dag, soft_fail=True, 
timeout=10, retries=5, poke_interval=1, external_dag_id="dependent_dag_id", 
external_task_id="dependent_task_id")

dummy_task = DummyOperator(task_id="collection_task", dag=test_dag)

dummy_task << ets
4

0 回答 0